ClickHouse API 的所有代码示例都可以在这里找到。
有关连接配置,请参见 配置。
有关支持的数据类型和 Go 类型映射,请参见 数据类型。
下面的示例会返回 ClickHouse 服务器版本,并演示如何连接到 ClickHouse——前提是 ClickHouse 未启用安全保护,且可使用默认用户 default 访问。
请注意,我们使用默认的原生端口进行连接。
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
fmt.Println(v)
完整示例
在后续所有示例中,除非明确说明,否则均假定已创建并可使用 ClickHouse conn 变量。
可以通过 Exec 方法执行任意语句。这适用于 DDL 和简单语句。不建议将其用于较大的插入操作或查询结果迭代。
conn.Exec(context.Background(), `DROP TABLE IF EXISTS example`)
err = conn.Exec(context.Background(), `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
conn.Exec(context.Background(), "INSERT INTO example VALUES (1, 'test-1')")
完整示例
请注意,可以向查询传递 Context。可使用它传递特定的查询级设置,参见使用 Context。
要插入大量行,客户端提供了批次处理机制。这需要先准备一个批次,然后即可向其中追加行,最后通过 Send() 方法发送。在执行 Send 之前,批次数据会一直保存在内存中。
建议对批次调用 Close,以避免连接泄漏。这可以在准备完批次后通过 defer 关键字实现。如果始终没有调用 Send,它会负责清理连接。请注意,如果没有追加任何行,查询日志中会显示 0 行插入记录。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8
, Col2 String
, Col3 FixedString(3)
, Col4 UUID
, Col5 Map(String, UInt8)
, Col6 Array(String)
, Col7 Tuple(String, UInt8, Array(Map(String, String)))
, Col8 DateTime
) Engine = Memory
`)
if err != nil {
return err
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
for i := 0; i < 1000; i++ {
err := batch.Append(
uint8(42),
"ClickHouse",
"Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
{"key": "value"},
{"key": "value"},
{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return batch.Send()
完整示例
有关 ClickHouse 的建议请参见此处。批次不应在多个 Go 协程之间共享——应为每个协程分别创建一个批次。
从上面的示例可以看出,追加行时,变量类型需要与列类型一致。虽然这种映射通常一目了然,但该接口会尽量保持灵活性,只要不会造成精度损失,就会进行类型转换。例如,下面演示了如何将字符串插入 datetime64 列。
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
for i := 0; i < 1000; i++ {
err := batch.Append(
"2006-01-02 15:04:05.999",
)
if err != nil {
return err
}
}
return batch.Send()
完整示例
有关每种列类型所支持的 Go 类型的完整概览,请参阅类型转换。
临时列是只写列,只在写入期间存在——不会被存储,也无法被查询。它们可用于在写入时计算派生列的值。
ctx := context.Background()
ddl := `
CREATE OR REPLACE TABLE test
(
id UInt64,
unhexed String EPHEMERAL,
hexed FixedString(4) DEFAULT unhex(unhexed)
)
ENGINE = MergeTree
ORDER BY id`
if err := conn.Exec(ctx, ddl); err != nil {
return err
}
// 插入时提供临时列的值
if err := conn.Exec(ctx, "INSERT INTO test (id, unhexed) VALUES (1, '5a90b714')"); err != nil {
return err
}
// 只能查询非临时列
rows, err := conn.Query(ctx, "SELECT id, hexed, hex(hexed) FROM test")
完整示例
你既可以使用 QueryRow 方法查询单行,也可以通过 Query 获取用于遍历结果集的游标。前者接受一个目标端,用于将数据序列化到其中;而后者则需要对每一行调用 Scan。
row := conn.QueryRow(context.Background(), "SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 []interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
完整示例
rows, err := conn.Query(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE Col1 >= 2")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
if err := rows.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
}
rows.Close()
return rows.Err()
完整示例
请注意,这两种情况下都需要传入变量的指针,以便将相应的列值反序列化到这些变量中。这些变量必须按 SELECT 语句中指定的顺序传入;如果使用如上所示的 SELECT *,默认会采用列声明顺序。
与插入类似,Scan 方法要求目标变量具有合适的类型。这里同样尽量保持灵活性:在不会造成精度损失的前提下,会尽可能进行类型转换。例如,上面的示例展示了将 UUID 列读取到字符串变量中。有关每种列类型支持的完整 Go 类型列表,请参阅类型转换。
最后,请注意,Query 和 QueryRow 方法支持传入 Context。这可用于设置查询级别的参数;更多详情请参阅使用 Context。
支持通过 Async 方法进行异步插入。这样,用户可以指定客户端是等待服务器完成插入,还是在收到数据后立即返回响应。这实际上控制的是参数 wait_for_async_insert。
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 21, 12, 0); err != nil {
return nil
}
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(ctx, `DROP TABLE IF EXISTS example`)
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if err := conn.Exec(ctx, ddl); err != nil {
return err
}
for i := 0; i < 100; i++ {
if err := conn.AsyncInsert(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"), false); err != nil {
return err
}
}
完整示例
支持按列格式插入。如果数据本身已经是按这种结构组织的,则无需再转换为行格式,因此可以提升性能。
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
var (
col1 []uint64
col2 []string
col3 [][]uint8
col4 []time.Time
)
for i := 0; i < 1_000; i++ {
col1 = append(col1, uint64(i))
col2 = append(col2, "Golang SQL database driver")
col3 = append(col3, []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9})
col4 = append(col4, time.Now())
}
if err := batch.Column(0).Append(col1); err != nil {
return err
}
if err := batch.Column(1).Append(col2); err != nil {
return err
}
if err := batch.Column(2).Append(col3); err != nil {
return err
}
if err := batch.Column(3).Append(col4); err != nil {
return err
}
return batch.Send()
完整示例
对于用户来说,Golang 结构体是 ClickHouse 中一行数据的逻辑表示。为便于处理,原生接口提供了几个便捷函数。
Select 方法支持通过一次调用,将一组返回行编组为结构体切片。
var result []struct {
Col1 uint8
Col2 string
ColumnWithName time.Time `ch:"Col3"`
}
if err = conn.Select(ctx, &result, "SELECT Col1, Col2, Col3 FROM example"); err != nil {
return err
}
for _, v := range result {
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", v.Col1, v.Col2, v.ColumnWithName)
}
完整示例
ScanStruct 可将查询返回的单行数据映射到结构体中。
var result struct {
Col1 int64
Count uint64 `ch:"count"`
}
if err := conn.QueryRow(context.Background(), "SELECT Col1, COUNT() AS count FROM example WHERE Col1 = 5 GROUP BY Col1").ScanStruct(&result); err != nil {
return err
}
完整示例
AppendStruct 允许将结构体追加到现有批次中,并将其视为完整的一行。这要求结构体中的列在名称和类型上都与表一致。虽然所有列都必须有对应的结构体字段,但某些结构体字段可能没有对应的列表达形式。这些字段会被忽略。
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
return err
}
defer batch.Close()
for i := 0; i < 1_000; i++ {
err := batch.AppendStruct(&row{
Col1: uint64(i),
Col2: "Golang SQL database driver",
Col3: []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9},
Col4: time.Now(),
ColIgnored: "this will be ignored",
})
if err != nil {
return err
}
}
完整示例
客户端支持在 Exec、Query 和 QueryRow 方法中绑定参数。如下面的示例所示,支持命名参数、编号参数和位置参数。下文将分别给出示例。
var count uint64
// 位置绑定
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// 数字绑定
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// 命名绑定
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
完整示例
默认情况下,如果将切片作为查询参数传入,它们会展开为以逗号分隔的值列表。如果需要注入一组用 [ ] 包裹的值,应使用 ArraySet。
如果需要组/元组,并且要用 ( ) 包裹,例如用于 IN 运算符,可以使用 GroupSet。这在需要多个组的场景中尤其有用,如下例所示。
最后,DateTime64 字段需要指定精度,才能确保参数被正确渲染。不过,客户端并不知道该字段的精度 level,因此必须由用户提供。为简化这一操作,我们提供了 DateNamed 参数。
var count uint64
// 数组将被展开
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN (?)", []int{100, 200, 300, 400, 500}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array unfolded count: %d\n", count)
// 数组将以 [] 形式保留
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col4 = ?", clickhouse.ArraySet{300, 301}).Scan(&count); err != nil {
return err
}
fmt.Printf("Array count: %d\n", count)
// Group sets 允许我们构造 ( ) 列表
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 IN ?", clickhouse.GroupSet{[]interface{}{100, 200, 300, 400, 500}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// 在需要嵌套时更为实用
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE (Col1, Col5) IN (?)", []clickhouse.GroupSet{{[]interface{}{100, 101}}, {[]interface{}{200, 201}}}).Scan(&count); err != nil {
return err
}
fmt.Printf("Group count: %d\n", count)
// 当需要为时间指定精度时,请使用 DateNamed#
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col3 >= @col3", clickhouse.DateNamed("col3", now.Add(time.Duration(500)*time.Millisecond), clickhouse.NanoSeconds)).Scan(&count); err != nil {
return err
}
fmt.Printf("NamedDate count: %d\n", count)
完整示例
Go 的上下文可用于在 API 边界之间传递截止时间、取消信号以及其他请求范围内的值。连接上的所有方法都将上下文作为第一个参数。虽然前面的示例使用的是 context.Background(),但你也可以利用这一能力传递设置和截止时间,以及取消查询。
传入通过 withDeadline 创建的上下文后,可以为查询设置执行时间限制。请注意,这是一个绝对时间;超时后只会释放连接并向 ClickHouse 发送取消信号。或者,也可以使用 WithCancel 显式取消查询。
辅助函数 clickhouse.WithQueryID 和 clickhouse.WithQuotaKey 可用于指定 Query id 和配额键。Query id 可用于在日志中跟踪查询以及执行取消操作。配额键可用于基于唯一键值对 ClickHouse 的使用施加限制,更多详情请参见配额管理。
你还可以使用上下文,确保某项设置仅应用于特定查询,而不是像连接设置中所示那样应用于整个连接。
最后,你可以通过 clickhouse.WithBlockSize 控制块缓冲区的大小。它会覆盖连接级别的设置 BlockBufferSize,并控制任意时刻解码并保存在内存中的最大块数。更大的值可能意味着更高的并行度,但代价是占用更多内存。
上述内容的示例如下。
dialCount := 0
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
dialCount++
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
})
if err != nil {
return err
}
if err := clickhouse_tests.CheckMinServerServerVersion(conn, 22, 6, 1); err != nil {
return nil
}
// 可以使用 context 将设置传递给特定的 API 调用
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"async_insert": "1",
}))
// 可以使用 context 取消查询
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRow(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// 为查询设置截止时间——到达指定时间点后将取消该查询。
// 查询将在 ClickHouse 中继续执行直至完成
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.Ping(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// 设置查询 id 以便在日志中追踪查询,例如查看 system.query_log
var one uint8
queryId, _ := uuid.NewUUID()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(queryId.String()))
if err = conn.QueryRow(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// 设置配额键——需先创建配额
if err = conn.Exec(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
type Number struct {
Number uint64 `ch:"number"`
}
for i := 1; i <= 6; i++ {
var result []Number
if err = conn.Select(ctx, &result, "SELECT number FROM numbers(10)"); err != nil {
return err
}
}
完整示例
查询时可以请求 Progress、Profile 和日志信息。Progress 信息会报告在 ClickHouse 中已读取和处理的行数及字节数等统计数据。相应地,Profile 信息会汇总返回给客户端的数据,包括字节总量 (未压缩) 、行数和块数。最后,日志信息会提供有关线程的统计信息,例如内存使用情况和数据处理速度。
要获取这些信息,用户需要使用 Context,并向其传递回调函数。
totalRows := uint64(0)
// 使用 context 传递进度和 profile 信息的回调函数
ctx := clickhouse.Context(context.Background(), clickhouse.WithProgress(func(p *clickhouse.Progress) {
fmt.Println("progress: ", p)
totalRows += p.Rows
}), clickhouse.WithProfileInfo(func(p *clickhouse.ProfileInfo) {
fmt.Println("profile info: ", p)
}), clickhouse.WithLogs(func(log *clickhouse.Log) {
fmt.Println("log info: ", log)
}))
rows, err := conn.Query(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
if err != nil {
return err
}
for rows.Next() {
}
// 注意:不要跳过 rows.Err() 检查
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("Total Rows: %d\n", totalRows)
rows.Close()
完整示例
您可能需要读取某些表,但并不清楚它们的 schema,或返回字段的类型。这在进行临时数据分析或编写通用工具时很常见。为此,查询响应中会提供列类型信息。可以结合 Go 反射,按正确类型在运行时创建变量实例,并将其传递给 Scan。
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.Query(context.Background(), query)
if err != nil {
return err
}
defer rows.Close()
var (
columnTypes = rows.ColumnTypes()
vars = make([]interface{}, len(columnTypes))
)
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
// 注意:不要跳过 rows.Err() 检查
if err := rows.Err(); err != nil {
return err
}
完整示例
外部表 允许客户端在执行 SELECT 查询时将数据发送到 ClickHouse。这些数据会被放入临时表中,并可在查询本身中用于求值。
要随查询发送外部数据,用户必须先通过 ext.NewTable 构建外部表,然后再通过上下文传递它。
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.Query(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
// 注意:不要跳过 rows.Err() 检查
if err := rows.Err(); err != nil {
return err
}
rows.Close()
var count uint64
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRow(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
完整示例
ClickHouse 在 TCP 和 HTTP 传输方式下都支持 trace 上下文传播。使用 TCP 时,客户端会将 span 序列化到原生二进制协议中。可使用 clickhouse.WithSpan 通过上下文将 span 附加到查询。
HTTP 传输限制虽然 ClickHouse server 接受标准的 traceparent / tracestate HTTP 请求头,但 clickhouse-go 的 HTTP 传输目前不会发送这些请求头,因此 WithSpan 在 HTTP 下不起作用。作为变通方案,你可以通过连接选项中的 HttpHeaders 手动设置请求头。
var count uint64
rows := conn.QueryRow(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
// 注意:不要跳过 rows.Err() 检查
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
完整示例
有关如何利用链路追踪的完整详情,请参阅 OpenTelemetry 支持。