本节重点介绍如何在使用 S3 表函数 从 S3 读取和插入数据时优化性能。
在通过调整线程数和块大小来提升插入性能之前,我们建议用户先了解 S3 插入的工作机制。如果你已经熟悉插入机制,或者只想快速查看一些建议,请直接跳转到下方的示例。
除了硬件规格外,还有两个主要因素会影响 ClickHouse 在单节点上的数据插入性能和资源占用:插入块大小和插入并行度。
执行 INSERT INTO SELECT 时,ClickHouse 会接收一部分数据,并基于接收到的数据按分区键 ① 形成 (至少) 一个内存中的插入块。随后,该块中的数据会被排序,并应用表引擎特定的优化。然后,数据会被压缩,并以新的数据分区片段形式 ② 写入数据库存储。
插入块大小会同时影响 ClickHouse 服务器的磁盘文件 I/O 使用情况和内存使用量。更大的插入块会占用更多内存,但会生成更大且数量更少的初始 parts。ClickHouse 在加载大量数据时,需要创建的 parts 越少,所需的磁盘文件 I/O 和自动后台合并就越少。
当 INSERT INTO SELECT 查询与集成表引擎或表函数结合使用时,数据由 ClickHouse 服务器拉取:
在数据完全加载之前,服务器会执行一个循环:
① 拉取并解析下一批数据,从中构建内存中的数据块(每个分区键对应一个)。
② 将该数据块作为新的数据分片写入存储。
跳转至 ①
在 ① 中,其大小取决于插入块大小,可通过以下两个设置控制:
当 insert block 中收集到指定数量的行,或达到配置的数据量时 (以先发生者为准) ,就会触发将该块写入一个新的 part。随后,插入循环会继续执行步骤 ①。
请注意,min_insert_block_size_bytes 的值表示未压缩的内存中块大小 (而不是磁盘上压缩后的 part 大小) 。另外,还要注意,生成的块和 part 很少会精确包含配置的行数或字节数,因为 ClickHouse 会按行-块方式对数据进行流式传输和处理。因此,这些设置指定的是最小阈值。
配置的插入块大小越小,大规模数据加载时创建的初始 parts 就越多,并且在数据摄取过程中并发执行的后台 part merge 也会越多。这可能导致资源争用 (CPU 和内存) ,并且在摄取完成后,还需要额外的时间才能让 parts 数量降到健康 (3000) 水平。
如果 part 数量超过建议限制,ClickHouse 查询性能会受到负面影响。
ClickHouse 会持续将 merge parts 合并成更大的 parts,直到其达到约 150 GiB 的 compressed size。下图展示了 ClickHouse server 如何合并 parts:
单个 ClickHouse server 会使用多个 background merge threads 来并发执行 part merges。每个线程都会执行一个循环:
① 决定接下来合并哪些 parts,并将这些 parts 作为块加载到内存中。
② 将内存中已加载的块合并为一个更大的块。
③ 将合并后的块写入磁盘上的新 part。
跳转至 ①
请注意,增加 CPU 核心数量和 RAM 容量会提升后台合并吞吐量。
已合并成更大 parts 的 parts 会被标记为非活动,并在可配置的若干分钟后最终删除。随着时间推移,这会形成一棵由已合并 parts 构成的树 (这也是 MergeTree 表名称的由来) 。
ClickHouse 服务器可以并行处理并插入数据。插入并行度会影响 ClickHouse 服务器的摄取吞吐量和内存使用量。并行加载和处理数据需要更多主内存,但由于处理速度更快,也会提升摄取吞吐量。
s3 等表函数支持通过 glob 模式指定一组待加载的文件名。当某个 glob 模式匹配到多个现有文件时,ClickHouse 可以利用并行运行的插入线程 (每台服务器) 在这些文件之间及文件内部并行读取,并将数据并行插入表中:
在所有文件中的全部数据处理完成之前,每个插入线程都会执行一个循环:
① 获取下一批未处理的文件数据(批次大小基于已配置的块大小),并从中创建内存数据块。
② 将该数据块写入存储中的新分片。
跳转至 ①。
此类并行插入线程的数量可通过 max_insert_threads 设置进行配置。对于开源 ClickHouse,默认值为 1;对于 ClickHouse Cloud,默认值为 4。
当文件数量很多时,多个插入线程并行处理的效果很好。它可以充分利用可用的 CPU 核心和网络带宽 (用于并行下载文件) 。在只有少量大文件需要加载到表中的场景下,ClickHouse 会自动实现较高的数据处理并行度,并通过为每个插入线程额外派生读取线程,并行读取 (下载) 大文件中更多不同区间的数据,从而优化网络带宽利用率。
对于 s3 函数和表,单个文件是否会并行下载取决于 max_download_threads 和 max_download_buffer_size 的取值。只有当文件大小大于 2 * max_download_buffer_size 时,才会并行下载。默认情况下,max_download_buffer_size 设置为 10MiB。在某些情况下,你可以放心地将该缓冲区大小增大到 50 MB (max_download_buffer_size=52428800) ,以确保每个文件仅由单个线程下载。这样可以减少每个线程花在发起 S3 调用上的时间,从而降低 S3 等待时间。此外,对于小到无法并行读取的文件,为了提高吞吐量,ClickHouse 还会通过异步预读这些文件来自动预取数据。
无论是直接对原位数据执行查询——即仅使用 ClickHouse 计算资源、数据仍以原始格式保留在 S3 中的临时查询场景——还是将数据从 S3 插入 ClickHouse MergeTree 表引擎,都需要优化使用 S3 表函数时的查询性能。除非另有说明,以下建议适用于这两种场景。
可用的 CPU 核心数和 RAM 大小会影响:
因此,也会影响整体摄取吞吐量。
请确保您的存储桶与 ClickHouse 实例位于同一区域。这个简单的优化就能显著提升吞吐量,尤其是在 AWS 基础设施上部署 ClickHouse 实例时。
ClickHouse 可以使用 s3 函数和 S3 引擎,读取存储在 S3 存储桶中且采用支持的格式的文件。如果读取的是原始文件,其中某些格式具有明显优势:
- 对于包含列名信息的格式 (如 Native、Parquet、CSVWithNames 和 TabSeparatedWithNames) ,查询会更简洁,因为用户无需在
s3 函数中指定列名。系统可以根据文件中的列名推断这些信息。
- 不同格式在读写吞吐量方面的性能各不相同。Native 和 Parquet 在读取性能上最优,因为它们本身就是列式格式,且更加紧凑。Native 格式还与 ClickHouse 在内存中存储数据的方式保持一致,因此在将数据流式写入 ClickHouse 时可进一步降低处理开销。
- 块大小通常会影响大文件读取的延迟。如果你只对数据进行采样,例如返回前 N 行,这一点会尤为明显。对于 CSV 和 TSV 这类格式,必须先解析文件才能返回一组行。因此,Native 和 Parquet 这类格式可以实现更快的采样。
- 每种压缩格式都有各自的优缺点,通常需要在压缩率与速度之间权衡,并且会偏重压缩性能或解压性能。如果压缩 CSV 或 TSV 这类原始文件,lz4 可提供最快的解压性能,但会牺牲压缩率。Gzip 通常压缩效果更好,但读取速度会稍慢一些。Xz 则更进一步,通常能提供最佳压缩率,但压缩和解压性能也是最慢的。如果用于导出,Gz 和 lz4 的压缩速度大致相当。你需要结合连接速度来权衡。解压或压缩速度提升带来的收益,很容易被连接到 S3 存储桶的较慢网络所抵消。
- 对于 Native 或 Parquet 这类格式,压缩带来的额外开销通常并不划算。由于这些格式本身就很紧凑,能够节省的数据量通常非常有限。压缩和解压所花费的时间很少能抵消网络传输时间——尤其是考虑到 S3 在全球范围内可用,并且通常具有较高的网络带宽。
为了进一步说明还可以做哪些优化,我们将使用 Stack Overflow 数据集中的帖子,并对这些数据的查询和插入性能进行优化。
该数据集由 189 个 Parquet 文件组成,从 2008 年 7 月到 2024 年 3 月,每个月对应一个文件。
请注意,出于性能考虑,我们按照上文建议使用 Parquet,并在与存储桶位于同一区域的 ClickHouse 集群上执行所有查询。该集群有 3 个节点,每个节点配备 32 GiB RAM 和 8 个 vCPU。
在不进行任何调优的情况下,我们将展示把该数据集插入 MergeTree 表引擎的性能,以及执行查询来统计提问最多用户的性能。这两项操作都刻意要求对数据进行全表扫描。
-- 用户名排行
SELECT
OwnerDisplayName,
count() AS num_posts
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
WHERE OwnerDisplayName NOT IN ('', 'anon')
GROUP BY OwnerDisplayName
ORDER BY num_posts DESC
LIMIT 5
┌─OwnerDisplayName─┬─num_posts─┐
│ user330315 │ 10344 │
│ user4039065 │ 5316 │
│ user149341 │ 4102 │
│ user529758 │ 3700 │
│ user3559349 │ 3068 │
└──────────────────┴───────────┘
5 rows in set. Elapsed: 3.013 sec. Processed 59.82 million rows, 24.03 GB (19.86 million rows/s., 7.98 GB/s.)
峰值内存占用: 603.64 MiB.
-- 加载到 posts 表
INSERT INTO posts SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
0 rows in set. Elapsed: 191.692 sec. Processed 59.82 million rows, 24.03 GB (312.06 thousand rows/s., 125.37 MB/s.)
在本示例中,我们只返回少量行。如果要衡量 SELECT 查询的性能,而查询会向客户端返回大量数据,请对查询使用 null 格式,或将结果导向 Null 引擎。这样可以避免客户端被大量数据压垮,并防止网络饱和。
在不受网络带宽或本地 I/O 限制的前提下,S3 上的读取性能会随 CPU 核心数线性扩展。增加线程数也会带来额外的内存开销,需要对此有所了解。以下设置可进行调整,以在某些情况下提升读取吞吐性能:
- 通常,
max_threads 的默认值已经足够,也就是核心数。如果某个查询占用的内存较高且需要降低,或者结果的 LIMIT 较小,则可以将该值调低。内存充足的用户也可以尝试增大该值,以获得更高的 S3 读取吞吐。通常这只会在核心数较少的机器上带来收益,即 < 10。随着其他资源成为瓶颈 (例如网络和 CPU 争用) ,进一步并行化带来的收益通常会逐渐减弱。
- 22.3.1 之前的 ClickHouse 版本,在使用
s3 函数或 S3 表引擎时,只会在多个文件之间并行读取。这要求用户确保文件在 S3 上被拆分为多个分块,并结合 glob pattern 进行读取,才能获得最佳读取性能。后续版本已支持在单个文件内部并行下载。
- 在线程数较少的场景中,将
remote_filesystem_read_method 设置为 “read”,以便从 S3 同步读取文件,可能会带来收益。
- 对于
s3 函数和表,单个文件的并行下载由 max_download_threads 和 max_download_buffer_size 的值决定。虽然 max_download_threads 控制使用的线程数,但只有当文件大小大于 2 * max_download_buffer_size 时,文件才会并行下载。默认情况下,max_download_buffer_size 的默认值为 10MiB。在某些情况下,你可以放心地将该缓冲区大小增大到 50 MB (max_download_buffer_size=52428800) ,目的是确保较小的文件只由单个线程下载。这可以减少每个线程发起 S3 调用所花费的时间,从而降低 S3 等待时间。示例可参见这篇博客文章。
在为提升性能做出任何更改之前,请务必先进行适当的测量。由于 S3 API 调用对延迟较为敏感,并且可能影响客户端计时,请使用查询日志获取性能指标,即 system.query_log。
以我们前面的查询为例,将 max_threads 翻倍到 16 (默认 max_threads 为节点上的核心数) ,可以在增加内存开销的前提下,将读取查询性能提升 2 倍。进一步增大 max_threads 的收益则会如图所示逐渐递减。
SELECT
OwnerDisplayName,
count() AS num_posts
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
WHERE OwnerDisplayName NOT IN ('', 'anon')
GROUP BY OwnerDisplayName
ORDER BY num_posts DESC
LIMIT 5
SETTINGS max_threads = 16
┌─OwnerDisplayName─┬─num_posts─┐
│ user330315 │ 10344 │
│ user4039065 │ 5316 │
│ user149341 │ 4102 │
│ user529758 │ 3700 │
│ user3559349 │ 3068 │
└──────────────────┴───────────┘
5 rows in set. Elapsed: 1.505 sec. Processed 59.82 million rows, 24.03 GB (39.76 million rows/s., 15.97 GB/s.)
峰值内存占用: 178.58 MiB.
SETTINGS max_threads = 32
5 rows in set. Elapsed: 0.779 sec. Processed 59.82 million rows, 24.03 GB (76.81 million rows/s., 30.86 GB/s.)
峰值内存占用: 369.20 MiB.
SETTINGS max_threads = 64
5 rows in set. Elapsed: 0.674 sec. Processed 59.82 million rows, 24.03 GB (88.81 million rows/s., 35.68 GB/s.)
峰值内存占用: 639.99 MiB.
为了获得最高的摄取性能,你需要根据可用的 CPU 核心数和 RAM 容量,选择 (1) 插入块大小,以及 (2) 合适的插入并行度。总结如下:
这两个性能因素之间存在相互制约的权衡关系 (此外,还需要与后台 part 合并进行权衡) 。ClickHouse server 可用的主内存是有限的。更大的块会占用更多主内存,从而限制我们能够使用的并行插入线程数量。反过来,并行插入线程数越多,所需的主内存也越多,因为插入线程的数量决定了内存中同时创建的插入块数量。这又会限制插入块可能达到的大小。此外,插入线程与后台合并线程之间还可能发生资源争用。配置较多的插入线程会 (1) 创建更多需要合并的 parts,并且 (2) 挤占原本可供后台合并线程使用的 CPU 核心和内存空间。
如需详细了解这些参数如何影响性能和资源,我们建议阅读这篇博客文章。正如文中所述,调优通常需要在这两个参数之间谨慎权衡。由于这种穷尽式测试往往并不现实,因此总结来说,我们建议:
• max_insert_threads: 选择约一半的可用 CPU 核心用于插入线程(以便为后台合并保留足够的专用核心)
• peak_memory_usage_in_bytes: 选择预期的峰值内存占用;可以是全部可用 RAM(若为独立摄取场景),或一半及以下(以便为其他并发任务留出空间)
Then:
min_insert_block_size_bytes = peak_memory_usage_in_bytes / (~3 * max_insert_threads)
使用此公式时,你可以将 min_insert_block_size_rows 设为 0 (以禁用基于行数的阈值) ,同时将 max_insert_threads 设为所选值,并将 min_insert_block_size_bytes 设为根据上述公式计算出的结果。
将此公式应用到我们前面的 Stack Overflow 示例中:
max_insert_threads=4 (每个节点 8 个核心)
peak_memory_usage_in_bytes - 32 GiB (节点资源的 100%) ,即 34359738368 字节。
min_insert_block_size_bytes = 34359738368/(3*4) = 2863311530
INSERT INTO posts SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet') SETTINGS min_insert_block_size_rows=0, max_insert_threads=4, min_insert_block_size_bytes=2863311530
0 rows in set. Elapsed: 128.566 sec. Processed 59.82 million rows, 24.03 GB (465.28 thousand rows/s., 186.92 MB/s.)
如图所示,调整这些设置后,insert 性能提升了 33% 以上。至于能否进一步提升单节点性能,就留给读者自行探索了。
通过资源和节点扩缩容既适用于读取查询,也适用于插入查询。
前面所有调优和查询都只使用了 ClickHouse Cloud 集群中的单个节点。很多情况下,你也会有多个 ClickHouse 节点可用。我们建议用户在初期优先进行垂直扩缩容,因为 S3 吞吐量会随核心数量线性提升。如果我们在资源翻倍 (64GiB、16 个 vCPU) 的更大 ClickHouse Cloud 节点上,使用适当的设置重复之前的插入和读取查询,两者的执行速度都大约会达到原来的两倍。
INSERT INTO posts SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet') SETTINGS min_insert_block_size_rows=0, max_insert_threads=8, min_insert_block_size_bytes=2863311530
0 rows in set. Elapsed: 67.294 sec. Processed 59.82 million rows, 24.03 GB (888.93 thousand rows/s., 357.12 MB/s.)
SELECT
OwnerDisplayName,
count() AS num_posts
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
WHERE OwnerDisplayName NOT IN ('', 'anon')
GROUP BY OwnerDisplayName
ORDER BY num_posts DESC
LIMIT 5
SETTINGS max_threads = 92
5 rows in set. Elapsed: 0.421 sec. Processed 59.82 million rows, 24.03 GB (142.08 million rows/s., 57.08 GB/s.)
单个节点也可能受网络带宽和 S3 GET 请求限制,因而无法通过纵向扩容实现性能的线性提升。
归根结底,出于硬件可用性和成本效益的考虑,通常需要进行水平扩展。在 ClickHouse Cloud 中,生产集群至少有 3 个节点。因此,你可能也希望在执行插入时利用所有节点。
要使用集群进行 S3 读取,需要像 利用集群 中所述那样使用 s3Cluster 函数。这样可将读取分布到各个节点上。
最先接收插入查询的服务器会先解析 glob 模式,然后将每个匹配文件的处理动态分派给自身及其他服务器。
我们重复之前的读取查询,通过将查询调整为使用 s3Cluster,把工作负载分布到 3 个节点上。在 ClickHouse Cloud 中,只需引用 default 集群,此过程会自动完成。
如 利用集群 中所述,这项工作是在文件级别分发的。要从此功能中受益,你需要有足够数量的文件,即文件数至少要 > 节点数。
SELECT
OwnerDisplayName,
count() AS num_posts
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
WHERE OwnerDisplayName NOT IN ('', 'anon')
GROUP BY OwnerDisplayName
ORDER BY num_posts DESC
LIMIT 5
SETTINGS max_threads = 16
┌─OwnerDisplayName─┬─num_posts─┐
│ user330315 │ 10344 │
│ user4039065 │ 5316 │
│ user149341 │ 4102 │
│ user529758 │ 3700 │
│ user3559349 │ 3068 │
└──────────────────┴───────────┘
5 rows in set. Elapsed: 0.622 sec. Processed 59.82 million rows, 24.03 GB (96.13 million rows/s., 38.62 GB/s.)
峰值内存占用: 176.74 MiB.
同样,我们的插入查询也可以采用分布式方式,并沿用前面为单节点确定的优化设置:
INSERT INTO posts SELECT *
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet') SETTINGS min_insert_block_size_rows=0, max_insert_threads=4, min_insert_block_size_bytes=2863311530
0 rows in set. Elapsed: 171.202 sec. Processed 59.82 million rows, 24.03 GB (349.41 thousand rows/s., 140.37 MB/s.)
读者会注意到,读取文件提升了查询性能,但并未改善插入性能。默认情况下,虽然读取会通过 s3Cluster 分布到各个节点,但插入仍会在发起节点上执行。这意味着,尽管读取会在每个节点上进行,生成的行仍会被路由到发起节点再做分发。在高吞吐量场景下,这可能会成为瓶颈。为解决这一问题,请为 s3cluster 函数设置参数 parallel_distributed_insert_select。
将其设置为 parallel_distributed_insert_select=2 可确保 SELECT 和 INSERT 都会在每个分片的各个节点上、直接针对该节点上 Distributed 引擎底层表执行。
INSERT INTO posts
SELECT *
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
SETTINGS parallel_distributed_insert_select = 2, min_insert_block_size_rows=0, max_insert_threads=4, min_insert_block_size_bytes=2863311530
0 rows in set. Elapsed: 54.571 sec. Processed 59.82 million rows, 24.03 GB (1.10 million rows/s., 440.38 MB/s.)
峰值内存占用: 11.75 GiB.
正如预期,这会使插入性能下降到原来的 1/3。
写入操作有时会因超时等错误而失败。写入失败时,数据可能已经成功写入,也可能尚未成功写入。为使客户端能够安全地重试写入,默认情况下,在 ClickHouse Cloud 等分布式部署中,ClickHouse 会尝试判断数据是否已经成功写入。如果写入的数据被标记为重复,ClickHouse 就不会再将其写入目标表。不过,用户仍会收到成功的操作状态,就像数据已正常写入一样。
这种行为会带来额外的写入开销,因此在从客户端加载数据或按批次加载时是合理的;但在从对象存储执行 INSERT INTO SELECT 时,则可能没有必要。通过在写入时禁用此功能,我们可以像下文所示那样提升性能:
INSERT INTO posts
SETTINGS parallel_distributed_insert_select = 2, min_insert_block_size_rows = 0, max_insert_threads = 4, min_insert_block_size_bytes = 2863311530, insert_deduplicate = 0
SELECT *
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
SETTINGS parallel_distributed_insert_select = 2, min_insert_block_size_rows = 0, max_insert_threads = 4, min_insert_block_size_bytes = 2863311530, insert_deduplicate = 0
0 rows in set. Elapsed: 52.992 sec. Processed 59.82 million rows, 24.03 GB (1.13 million rows/s., 453.50 MB/s.)
峰值内存占用: 26.57 GiB.
在 ClickHouse 中,optimize_on_insert 设置用于控制是否在插入过程中合并数据分区片段。启用时 (默认 optimize_on_insert = 1) ,较小的数据分区片段会在写入时合并为较大的分区片段,从而减少查询时需要读取的 parts 数量,提升查询性能。不过,这种合并也会给插入过程带来额外开销,可能导致高吞吐量写入变慢。
禁用此设置 (optimize_on_insert = 0) 后,会跳过插入期间的合并,让数据写入速度更快,尤其适用于频繁的小批量插入。合并过程会延后到后台执行,因此虽然插入性能会更好,但小分区片段的数量会暂时增加,在后台合并完成前,查询速度可能会变慢。当插入性能是首要考虑,且后续可由后台合并高效完成优化时,此设置就非常适合。如下面所示,禁用该设置可以提高插入吞吐量:
SELECT *
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/by_month/*.parquet')
SETTINGS parallel_distributed_insert_select = 2, min_insert_block_size_rows = 0, max_insert_threads = 4, min_insert_block_size_bytes = 2863311530, insert_deduplicate = 0, optimize_on_insert = 0
0 rows in set. Elapsed: 49.688 sec. Processed 59.82 million rows, 24.03 GB (1.20 million rows/s., 483.66 MB/s.)
- 在低内存场景下,如果向 S3 插入数据,建议考虑降低
max_insert_delayed_streams_for_parallel_write。