O cliente oficial em Rust para conexão com o ClickHouse, desenvolvido originalmente por Paul Loyd. O código-fonte do cliente está disponível no repositório no GitHub.
- Usa
serde para codificar/decodificar linhas.
- Suporta atributos do
serde: skip_serializing, skip_deserializing, rename.
- Usa o formato
RowBinary no transporte HTTP.
- Há planos para migrar para
Native sobre TCP.
- Suporta TLS (por meio das features
native-tls e rustls-tls).
- Suporta compressão e descompressão (LZ4).
- Fornece APIs para selecionar ou inserir dados, executar DDLs e fazer batching no cliente.
- Fornece mocks úteis para testes unitários.
Para usar este crate, adicione o seguinte ao seu Cargo.toml:
[dependencies]
clickhouse = "0.12.2"
[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
Veja também: página no crates.io.
lz4 (ativado por padrão) — habilita as variantes Compression::Lz4 e Compression::Lz4Hc(_). Quando ativado, Compression::Lz4 é usado por padrão em todas as queries, exceto para WATCH.
native-tls — oferece suporte a URLs com o schema HTTPS via hyper-tls, que faz link com o OpenSSL.
rustls-tls — oferece suporte a URLs com o schema HTTPS via hyper-rustls, que não faz link com o OpenSSL.
inserter — habilita client.inserter().
test-util — adiciona mocks. Veja o exemplo. Use-o apenas em dev-dependencies.
watch — habilita a funcionalidade client.watch. Consulte a seção correspondente para mais detalhes.
uuid — adiciona serde::uuid para funcionar com a crate uuid.
time — adiciona serde::time para funcionar com a crate time.
Ao se conectar ao ClickHouse por meio de uma URL HTTPS, o recurso native-tls ou rustls-tls deve estar habilitado.
Se ambos estiverem habilitados, o recurso rustls-tls terá precedência.
Compatibilidade entre versões do ClickHouse
O cliente é compatível com versões LTS ou mais recentes do ClickHouse, bem como com o ClickHouse Cloud.
O servidor ClickHouse anterior à v22.6 processa o RowBinary de forma incorreta em alguns casos raros.
Você pode usar a v0.11+ e habilitar o recurso wa-37420 para resolver esse problema. Observação: esse recurso não deve ser usado com versões mais recentes do ClickHouse.
Nosso objetivo é abranger vários cenários de uso do cliente com os examples no repositório do cliente. A visão geral está disponível no README dos examples.
Se algo não estiver claro ou estiver faltando nos examples ou na documentação a seguir, sinta-se à vontade para entrar em contato conosco.
O crate ch2rs é útil para gerar um tipo de linha com base no ClickHouse.
Criando uma instância de cliente
Reutilize os clientes criados ou clone-os para reaproveitar o pool de conexões subjacente do hyper.
use clickhouse::Client;
let client = Client::default()
// deve incluir tanto o protocolo quanto a porta
.with_url("http://localhost:8123")
.with_user("name")
.with_password("123")
.with_database("test");
Conexão HTTPS ou ClickHouse Cloud
O HTTPS funciona com os recursos do Cargo rustls-tls ou native-tls.
Em seguida, crie o cliente normalmente. Neste exemplo, as variáveis de ambiente são usadas para armazenar os detalhes da conexão:
A URL deve incluir o protocolo e a porta, por exemplo, https://instance.clickhouse.cloud:8443.
fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}
let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));
Veja também:
use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- O marcador
?fields é substituído por no, name (campos de Row).
- O marcador
? é substituído pelos valores nas chamadas bind() a seguir.
- Os métodos práticos
fetch_one::<Row>() e fetch_all::<Row>() podem ser usados para obter a primeira linha ou todas as linhas, respectivamente.
sql::Identifier pode ser usado para associar nomes de tabelas.
Observação: como toda a resposta é transmitida por streaming, os cursores podem retornar um erro mesmo depois de produzir algumas linhas. Se isso acontecer no seu caso de uso, você pode tentar query(...).with_option("wait_end_of_query", "1") para ativar a bufferização da resposta no servidor. Mais detalhes. A opção buffer_size também pode ser útil.
Use wait_end_of_query com cautela ao selecionar linhas, pois isso pode levar a um maior consumo de memória no servidor e provavelmente reduzirá o desempenho geral.
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- Se
end() não for chamado, o INSERT será abortado.
- As linhas são enviadas progressivamente como um stream para distribuir a carga de rede.
- O ClickHouse insere lotes de forma atômica somente se todas as linhas couberem na mesma partição e se a quantidade delas for menor que
max_insert_block_size.
Async insert (batching no servidor)
Você pode usar as inserções assíncronas do ClickHouse para evitar o batching no cliente dos dados recebidos. Isso pode ser feito simplesmente fornecendo a opção async_insert ao método insert (ou até mesmo à própria instância Client, para que isso afete todas as chamadas de insert).
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");
Veja também:
Recurso Inserter (batching no cliente)
Requer o recurso inserter do Cargo.
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
// não se esqueça de finalizar o inserter durante o encerramento da aplicação
// e fazer o commit das linhas restantes. `.end()` também fornecerá as estatísticas.
inserter.end().await?;
Inserter encerra a inserção ativa em commit() se qualquer um dos limites (max_bytes, max_rows, period) for atingido.
- O intervalo entre o encerramento de
INSERTs ativos pode ser ajustado com with_period_bias para evitar picos de carga causados por insertores paralelos.
Inserter::time_left() pode ser usado para detectar quando o período atual termina. Chame Inserter::commit() novamente para verificar os limites se o seu stream emitir itens com pouca frequência.
- Os limiares de tempo são implementados com o crate quanta para acelerar o
inserter. Ele não é usado se test-util estiver habilitado (assim, o tempo pode ser controlado por tokio::time::advance() em testes personalizados).
- Todas as linhas entre chamadas de
commit() são inseridas na mesma instrução INSERT.
Não se esqueça de fazer flush se quiser encerrar/finalizar a inserção:
Em uma implantação de nó único, basta executar as DDLs assim:
client.query("DROP TABLE IF EXISTS some").execute().await?;
No entanto, em implantações em cluster com balanceador de carga ou no ClickHouse Cloud, recomenda-se aguardar a aplicação do DDL em todas as réplicas usando a opção wait_end_of_query. Isso pode ser feito assim:
client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;
Configurações do ClickHouse
Você pode aplicar diversas configurações do ClickHouse usando o método with_option. Por exemplo:
let numbers = client
.query("SELECT number FROM system.numbers")
// Esta configuração será aplicada somente a esta consulta específica;
// ela substituirá a configuração global do cliente.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;
Além de query, isso funciona de maneira semelhante nos métodos insert e inserter; além disso, o mesmo método pode ser chamado na instância Client para definir configurações globais para todas as consultas.
Com .with_option, você pode definir a opção query_id para identificar consultas no log de consultas do ClickHouse.
let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;
Além de query, ele também funciona de forma semelhante com os métodos insert e inserter.
Se você definir query_id manualmente, certifique-se de que ele seja único. UUIDs são uma boa escolha para isso.
Veja também: exemplo de query_id no repositório do cliente.
Assim como em query_id, você pode definir session_id para executar as instruções na mesma sessão. session_id pode ser definido globalmente no nível do cliente ou por chamada de query, insert ou inserter.
let client = Client::default()
.with_url("http://localhost:8123")
.with_option("session_id", "my-session");
Em implantações em cluster, devido à ausência de “sticky sessions”, você precisa estar conectado a um nó específico do cluster para usar esse recurso corretamente, pois, por exemplo, um balanceador de carga round-robin não garante que as solicitações subsequentes sejam processadas pelo mesmo nó do ClickHouse.
Veja também: exemplo de session_id no repositório do cliente.
Se você estiver usando autenticação de proxy ou precisar enviar cabeçalhos personalizados, pode fazer assim:
let client = Client::default()
.with_url("http://localhost:8123")
.with_header("X-My-Header", "hello");
Veja também: exemplo de cabeçalhos HTTP personalizados no repositório do cliente.
Cliente HTTP personalizado
Isso pode ser útil para ajustar as configurações internas do pool de conexões HTTP.
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;
let connector = HttpConnector::new(); // ou HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// Por quanto tempo manter um socket idle específico ativo no lado do cliente (em milissegundos).
// Deve ser consideravelmente menor que o timeout de KeepAlive do servidor ClickHouse,
// que era 3 segundos por padrão para versões anteriores à 23.11, e 10 segundos a partir daí.
.pool_idle_timeout(Duration::from_millis(2_500))
// Define o número máximo de conexões Keep-Alive idle permitidas no pool.
.pool_max_idle_per_host(4)
.build(connector);
let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
Este exemplo depende da API Hyper legada e está sujeito a mudanças no futuro.
Veja também: exemplo de cliente HTTP personalizado no repositório do cliente.
Veja também estes exemplos adicionais:
(U)Int(8|16|32|64|128) corresponde aos tipos (u|i)(8|16|32|64|128) equivalentes, ou a newtypes baseados neles.
(U)Int256 não tem suporte direto, mas há uma solução alternativa.
Float(32|64) corresponde aos tipos f(32|64) equivalentes, ou a newtypes baseados neles.
Decimal(32|64|128) corresponde aos tipos i(32|64|128) equivalentes, ou a newtypes baseados neles. É mais prático usar fixnum ou outra implementação de números com sinal em ponto fixo.
Boolean corresponde a bool ou a newtypes baseados nele.
String corresponde a qualquer tipo de string ou bytes, por exemplo, &str, &[u8], String, Vec<u8> ou SmartString. Newtypes também têm suporte. Para armazenar bytes, considere usar serde_bytes, pois é mais eficiente.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
FixedString(N) tem suporte como um array de bytes, por exemplo [u8; N].
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}
#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
UUID é mapeado de e para uuid::Uuid usando serde::uuid. Requer o recurso uuid.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
Date é mapeado de/para u16 ou um newtype baseado nele e representa um número de dias decorridos desde 1970-01-01. Além disso, time::Date também é compatível com o uso de serde::time::date, o que requer o recurso time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
Date32 é mapeado de/para i32 ou um newtype baseado nele e representa um número de dias decorridos desde 1970-01-01. Além disso, time::Date também é compatível com o uso de serde::time::date32, o que requer o recurso time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
DateTime é convertido de/para u32 ou um newtype baseado nele e representa um número de segundos decorridos desde a epoch UNIX. Além disso, time::OffsetDateTime é compatível ao usar serde::time::datetime, o que requer o recurso time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
DateTime64(_) é mapeado de/para i32 ou um newtype baseado nele e representa o tempo decorrido desde a epoch UNIX. Além disso, time::OffsetDateTime também tem suporte usando serde::time::datetime64::*, o que exige a feature time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // s/us/ms/ns decorridos dependendo de `DateTime64(X)`
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
Tuple(A, B, ...) é mapeado de/para (A, B, ...) ou um newtype sobre ele.
Array(_) é mapeado de/para qualquer slice, por exemplo Vec<_>, &[_]. newtypes também são compatíveis.
Map(K, V) se comporta como Array((K, V)).
LowCardinality(_) tem suporte transparente.
Nullable(_) é mapeado de/para Option<_>. Para os helpers clickhouse::serde::*, adicione ::option.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
- Há suporte a
Nested ao fornecer vários arrays com renomeação.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
- Há suporte a tipos
Geo. Point se comporta como uma tupla (f64, f64), e os demais tipos são apenas sequências de pontos.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
- Os tipos de dados
Variant, Dynamic e o (novo) JSON ainda não têm suporte.
O crate fornece utilitários para simular o servidor CH e testar consultas DDL, SELECT, INSERT e WATCH. Esse recurso pode ser habilitado com o recurso test-util. Use-a apenas como dev-dependency.
Veja o exemplo.
A causa mais comum do erro CANNOT_READ_ALL_DATA é que a definição da linha no lado da aplicação não corresponde à do ClickHouse.
Considere a seguinte tabela:
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
Então, se EventLog estiver definido no lado da aplicação com tipos incompatíveis, por exemplo:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- deve ser u32 em vez disso!
}
Ao inserir os dados, pode ocorrer o seguinte erro:
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
Neste exemplo, isso é resolvido com a definição correta da estrutura EventLog:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}
- Os tipos de dados
Variant, Dynamic e JSON (novo) ainda não são suportados.
- O binding de parâmetros no servidor ainda não é suportado; consulte esta issue para acompanhar.
Se você tiver alguma dúvida ou precisar de ajuda, fique à vontade para falar conosco no Slack da comunidade ou pelas issues do GitHub.