概述
启动 ClickHouse
docker run -d --network host --name clickhouse -p 8123:8123 -p9000:9000 --ulimit nofile=262144:262144 clickhouse
docker exec -it clickhouse clickhouse-client
创建表
CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;
添加数据集
INSERT INTO taxi.trips
SELECT * FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
'TabSeparatedWithNames', "
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0
SELECT count() FROM taxi.trips;
quit
安装 pg_clickhouse
docker run -d --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
-d ghcr.io/clickhouse/pg_clickhouse:18
连接 pg_clickhouse
docker exec -it pg_clickhouse psql -U postgres
CREATE EXTENSION pg_clickhouse;
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (user 'default');
password 选项。
现在,添加 taxi 表,只需将远程
ClickHouse 数据库中的所有表导入到一个 Postgres schema 中:
CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;
\det+ 查看它:
taxi=# \det+ taxi.*
List of foreign tables
Schema | Table | Server | FDW options | Description
--------+-------+----------+-----------------------------------------------------------+-------------
taxi | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 row)
\d 查看所有列:
taxi=# \d taxi.trips
Foreign table "taxi.trips"
Column | Type | Collation | Nullable | Default | FDW options
-----------------------+--------------------------+-----------+----------+---------+-------------
trip_id | bigint | | not null | |
vendor_id | text | | not null | |
pickup_date | date | | not null | |
pickup_datetime | timestamp with time zone | | not null | |
dropoff_date | date | | not null | |
dropoff_datetime | timestamp with time zone | | not null | |
store_and_fwd_flag | smallint | | not null | |
rate_code_id | smallint | | not null | |
pickup_longitude | double precision | | not null | |
pickup_latitude | double precision | | not null | |
dropoff_longitude | double precision | | not null | |
dropoff_latitude | double precision | | not null | |
passenger_count | smallint | | not null | |
trip_distance | double precision | | not null | |
fare_amount | numeric(10,2) | | not null | |
extra | numeric(10,2) | | not null | |
mta_tax | numeric(10,2) | | not null | |
tip_amount | numeric(10,2) | | not null | |
tolls_amount | numeric(10,2) | | not null | |
ehail_fee | numeric(10,2) | | not null | |
improvement_surcharge | numeric(10,2) | | not null | |
total_amount | numeric(10,2) | | not null | |
payment_type | text | | not null | |
trip_type | smallint | | not null | |
pickup | character varying(25) | | not null | |
dropoff | character varying(25) | | not null | |
cab_type | text | | not null | |
pickup_nyct2010_gid | smallint | | not null | |
pickup_ctlabel | real | | not null | |
pickup_borocode | smallint | | not null | |
pickup_ct2010 | text | | not null | |
pickup_boroct2010 | text | | not null | |
pickup_cdeligibil | text | | not null | |
pickup_ntacode | character varying(4) | | not null | |
pickup_ntaname | text | | not null | |
pickup_puma | integer | | not null | |
dropoff_nyct2010_gid | smallint | | not null | |
dropoff_ctlabel | real | | not null | |
dropoff_borocode | smallint | | not null | |
dropoff_ct2010 | text | | not null | |
dropoff_boroct2010 | text | | not null | |
dropoff_cdeligibil | text | | not null | |
dropoff_ntacode | character varying(4) | | not null | |
dropoff_ntaname | text | | not null | |
dropoff_puma | integer | | not null | |
Server: taxi_srv
FDW options: (database 'taxi', table_name 'trips', engine 'MergeTree')
SELECT count(*) FROM taxi.trips;
count
---------
1999657
(1 row)
COUNT() 聚合) 下推,因此它会在 ClickHouse 上运行,并且只
向 Postgres 返回一行结果。使用 EXPLAIN 查看:
EXPLAIN select count(*) from taxi.trips;
QUERY PLAN
-------------------------------------------------
Foreign Scan (cost=1.00..-0.90 rows=1 width=8)
Relations: Aggregate on (trips)
(2 rows)
分析数据
-
计算平均小费金额:
taxi=# \timing Timing is on. taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips; round ------- 1.68 (1 行) Time: 9.438 ms -
按乘客人数计算平均费用:
taxi=# SELECT passenger_count, avg(total_amount)::NUMERIC(10, 2) AS average_total_amount FROM taxi.trips GROUP BY passenger_count; passenger_count | average_total_amount -----------------+---------------------- 0 | 22.68 1 | 15.96 2 | 17.14 3 | 16.75 4 | 17.32 5 | 16.34 6 | 16.03 7 | 59.79 8 | 36.40 9 | 9.79 (10 rows) Time: 27.266 ms -
计算每个街区每日的上车次数:
taxi=# SELECT pickup_date, pickup_ntaname, SUM(1) AS number_of_trips FROM taxi.trips GROUP BY pickup_date, pickup_ntaname ORDER BY pickup_date ASC LIMIT 10; pickup_date | pickup_ntaname | number_of_trips -------------+--------------------------------+----------------- 2015-07-01 | Williamsburg | 1 2015-07-01 | park-cemetery-etc-Queens | 6 2015-07-01 | Maspeth | 1 2015-07-01 | Stuyvesant Town-Cooper Village | 44 2015-07-01 | Rego Park | 1 2015-07-01 | Greenpoint | 7 2015-07-01 | Highbridge | 1 2015-07-01 | Briarwood-Jamaica Hills | 3 2015-07-01 | Airport | 550 2015-07-01 | East Harlem North | 32 (10 rows) Time: 30.978 ms -
计算每次行程的时长 (以分钟计) ,然后按
行程时长对结果进行分组:
taxi=# SELECT avg(tip_amount) AS avg_tip, avg(fare_amount) AS avg_fare, avg(passenger_count) AS avg_passenger, count(*) AS count, round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes FROM taxi.trips WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0 GROUP BY trip_minutes ORDER BY trip_minutes DESC LIMIT 5; avg_tip | avg_fare | avg_passenger | count | trip_minutes -------------------+------------------+------------------+-------+-------------- 1.96 | 8 | 1 | 1 | 27512 0 | 12 | 2 | 1 | 27500 0.562727272727273 | 17.4545454545455 | 2.45454545454545 | 11 | 1440 0.716564885496183 | 14.2786259541985 | 1.94656488549618 | 131 | 1439 1.00945205479452 | 12.8787671232877 | 1.98630136986301 | 146 | 1438 (5 rows) Time: 45.477 ms -
按一天中的小时细分,显示每个街区的上客次数:
taxi=# SELECT pickup_ntaname, date_part('hour', pickup_datetime) as pickup_hour, SUM(1) AS pickups FROM taxi.trips WHERE pickup_ntaname != '' GROUP BY pickup_ntaname, pickup_hour ORDER BY pickup_ntaname, date_part('hour', pickup_datetime) LIMIT 5; pickup_ntaname | pickup_hour | pickups ----------------+-------------+--------- Airport | 0 | 3509 Airport | 1 | 1184 Airport | 2 | 401 Airport | 3 | 152 Airport | 4 | 213 (5 rows) Time: 36.895 ms -
将显示时区设为纽约,并检索前往拉瓜迪亚机场或 JFK
机场的行程:
taxi=# SET timezone = 'America/New_York'; SET taxi=# SELECT pickup_datetime, dropoff_datetime, total_amount, pickup_nyct2010_gid, dropoff_nyct2010_gid, CASE WHEN dropoff_nyct2010_gid = 138 THEN 'LGA' WHEN dropoff_nyct2010_gid = 132 THEN 'JFK' END AS airport_code, EXTRACT(YEAR FROM pickup_datetime) AS year, EXTRACT(DAY FROM pickup_datetime) AS day, EXTRACT(HOUR FROM pickup_datetime) AS hour FROM taxi.trips WHERE dropoff_nyct2010_gid IN (132, 138) ORDER BY pickup_datetime LIMIT 5; pickup_datetime | dropoff_datetime | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour ------------------------+------------------------+--------------+---------------------+----------------------+--------------+------+-----+------ 2015-06-30 20:04:14-04 | 2015-06-30 20:15:29-04 | 13.30 | -34 | 132 | JFK | 2015 | 30 | 20 2015-06-30 20:09:42-04 | 2015-06-30 20:12:55-04 | 6.80 | 50 | 138 | LGA | 2015 | 30 | 20 2015-06-30 20:23:04-04 | 2015-06-30 20:24:39-04 | 4.80 | -125 | 132 | JFK | 2015 | 30 | 20 2015-06-30 20:27:51-04 | 2015-06-30 20:39:02-04 | 14.72 | -101 | 138 | LGA | 2015 | 30 | 20 2015-06-30 20:32:03-04 | 2015-06-30 20:55:39-04 | 39.34 | 48 | 138 | LGA | 2015 | 30 | 20 (5 rows) Time: 17.450 ms
创建字典
LocationID 列会映射到 trips 表中的 pickup_nyct2010_gid 和
dropoff_nyct2010_gid 列:
| LocationID | Borough | Zone | service_zone |
|---|---|---|---|
| 1 | EWR | Newark Airport | EWR |
| 2 | Queens | Jamaica Bay | Boro Zone |
| 3 | Bronx | Allerton/Pelham Gardens | Boro Zone |
| 4 | Manhattan | Alphabet City | Yellow Zone |
| 5 | Staten Island | Arden Heights | Boro Zone |
-
仍在 Postgres 中,使用
clickhouse_raw_query函数创建一个名为taxi_zone_dictionary的 ClickHouse [字典],并通过 S3 中的 CSV file 为该 字典填充数据:SELECT clickhouse_raw_query($$ CREATE DICTIONARY taxi.taxi_zone_dictionary ( LocationID Int64 DEFAULT 0, Borough String, zone String, service_zone String ) PRIMARY KEY LocationID SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 0) LAYOUT(HASHED_ARRAY()) $$, 'host=localhost dbname=taxi');
将
LIFETIME 设为 0 会禁用自动更新,从而避免对我们的
S3 bucket 产生不必要的流量。在其他情况下,你可能需要采用不同的配置。
详情请参见使用 LIFETIME 刷新字典数据。- 现在导入它:
IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary)
FROM SERVER taxi_srv INTO taxi;
- 确认可以查询它:
taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3;
LocationID | Borough | Zone | service_zone
------------+-----------+-----------------------------------------------+--------------
77 | Brooklyn | East New York/Pennsylvania Avenue | Boro Zone
106 | Brooklyn | Gowanus | Boro Zone
103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone
(3 rows)
- 很好。现在在查询中使用
dictGet函数来获取某个 行政区的名称。下面这个查询会汇总终点为 LaGuardia 或 JFK 机场的各行政区出租车行程数量:
taxi=# SELECT
count(1) AS total,
COALESCE(NULLIF(dictGet(
'taxi.taxi_zone_dictionary', 'Borough',
toUInt64(pickup_nyct2010_gid)
), ''), 'Unknown') AS borough_name
FROM taxi.trips
WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138
GROUP BY borough_name
ORDER BY total DESC;
total | borough_name
-------+---------------
23683 | Unknown
7053 | Manhattan
6828 | Brooklyn
4458 | Queens
2670 | Bronx
554 | Staten Island
53 | EWR
(7 rows)
Time: 66.245 ms
执行 join
taxi_zone_dictionary 与你的 trips
表进行连接。
-
先从一个简单的
JOIN开始,它的作用与上面的机场 查询类似:taxi=# SELECT count(1) AS total, "Borough" FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID") WHERE pickup_nyct2010_gid > 0 AND dropoff_nyct2010_gid IN (132, 138) GROUP BY "Borough" ORDER BY total DESC; total | borough_name -------+--------------- 7053 | Manhattan 6828 | Brooklyn 4458 | Queens 2670 | Bronx 554 | Staten Island 53 | EWR (6 rows) Time: 48.449 ms
请注意,上述
JOIN 查询的输出与上面的 dictGet
查询相同 (只是未包含 Unknown 值) 。在底层,
ClickHouse 实际上会为 taxi_zone_dictionary 字典调用
dictGet 函数,但 JOIN 语法对 SQL 开发者来说更熟悉。 taxi=# explain SELECT
count(1) AS total,
"Borough"
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
WHERE pickup_nyct2010_gid > 0
AND dropoff_nyct2010_gid IN (132, 138)
GROUP BY "Borough"
ORDER BY total DESC;
QUERY PLAN
-----------------------------------------------------------------------
Foreign Scan (cost=1.00..5.10 rows=1000 width=40)
Relations: Aggregate on ((trips) INNER JOIN (taxi_zone_dictionary))
(2 rows)
Time: 2.012 ms
-
此查询会返回小费金额最高的 1000 次行程对应的行,
然后将每一行与该字典进行内连接:
taxi=# SELECT * FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID" WHERE tip_amount > 0 ORDER BY tip_amount DESC LIMIT 1000;
通常,我们会避免在 PostgreSQL 和 ClickHouse 中使用
SELECT *。你
只应检索实际需要的列。