On 21st of March, Robert Haas committed patch:
Support parallel aggregation. Parallel workers can now partially aggregate the data and pass the transition values back to the leader, which can combine the partial results to produce the final answer. David Rowley, based on earlier work by Haribabu Kommi. Reviewed by Álvaro Herrera, Tomas Vondra, Amit Kapila, James Sewell, and me.
This is big.
Some time ago, we got parallel-able sequential scans. Then, two months later parallel joins.
Now, it's aggregates.
How does it work?
Let's create test table, and see what we can do with it:
$ CREATE TABLE test AS SELECT i AS id, now() - random() * '5 years'::INTERVAL AS ts, (random() * 100000000)::int4 AS NUMBER, repeat('x', (10 + 40 * random())::int4) AS txt FROM generate_series(1, 10000000) i; SELECT 10000000 $ ALTER TABLE test ADD PRIMARY KEY (id); ALTER TABLE
First, lets see if the parallelization works, and if yes, how to spot it. Sanity check:
$ EXPLAIN analyze SELECT MIN(ts) FROM test; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=226506.30..226506.31 ROWS=1 width=8) (actual TIME=1279.564..1279.565 ROWS=1 loops=1) -> Seq Scan ON test (cost=0.00..201505.84 ROWS=10000184 width=8) (actual TIME=0.032..664.339 ROWS=10000000 loops=1) Planning TIME: 0.624 ms Execution TIME: 1279.630 ms (4 ROWS) $ SET max_parallel_degree = 2; SET $ EXPLAIN analyze SELECT MIN(ts) FROM test; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=154588.51..154588.52 ROWS=1 width=8) (actual TIME=476.593..476.593 ROWS=1 loops=1) -> Gather (cost=154588.29..154588.50 ROWS=2 width=8) (actual TIME=476.539..476.588 ROWS=3 loops=1) NUMBER OF Workers: 2 -> Partial Aggregate (cost=153588.29..153588.30 ROWS=1 width=8) (actual TIME=471.074..471.074 ROWS=1 loops=3) -> Parallel Seq Scan ON test (cost=0.00..143171.43 ROWS=4166743 width=8) (actual TIME=0.040..240.595 ROWS=3333333 loops=3) Planning TIME: 0.148 ms Execution TIME: 477.425 ms (7 ROWS)
OK. We see now that when parallelization is enable (max_parallel_degree > 0), plan is clearly different, and shows information about workers, gathering, and finalizing.
I tested it for min(ts). Can we also do average? sum?
$ EXPLAIN analyze SELECT avg(NUMBER) FROM test; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=226506.30..226506.31 ROWS=1 width=32) (actual TIME=1241.411..1241.411 ROWS=1 loops=1) -> Seq Scan ON test (cost=0.00..201505.84 ROWS=10000184 width=4) (actual TIME=0.017..595.584 ROWS=10000000 loops=1) Planning TIME: 0.241 ms Execution TIME: 1241.460 ms (4 ROWS) $ EXPLAIN analyze SELECT SUM(LENGTH(txt)) FROM test; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=165005.37..165005.38 ROWS=1 width=8) (actual TIME=730.439..730.439 ROWS=1 loops=1) -> Gather (cost=165005.15..165005.36 ROWS=2 width=8) (actual TIME=730.401..730.435 ROWS=3 loops=1) NUMBER OF Workers: 2 -> Partial Aggregate (cost=164005.15..164005.16 ROWS=1 width=8) (actual TIME=728.536..728.536 ROWS=1 loops=3) -> Parallel Seq Scan ON test (cost=0.00..143171.43 ROWS=4166743 width=31) (actual TIME=0.012..217.970 ROWS=3333333 loops=3) Planning TIME: 0.177 ms Execution TIME: 731.256 ms (7 ROWS)
OK. Looks like we can't do parallel avg(), but this can be worked around:
$ EXPLAIN analyze SELECT SUM(NUMBER)::float8 / COUNT(NUMBER)::float8 FROM test; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=165005.37..165005.39 ROWS=1 width=8) (actual TIME=481.096..481.096 ROWS=1 loops=1) -> Gather (cost=165005.15..165005.36 ROWS=2 width=16) (actual TIME=481.047..481.076 ROWS=3 loops=1) NUMBER OF Workers: 2 -> Partial Aggregate (cost=164005.15..164005.16 ROWS=1 width=16) (actual TIME=479.455..479.455 ROWS=1 loops=3) -> Parallel Seq Scan ON test (cost=0.00..143171.43 ROWS=4166743 width=4) (actual TIME=0.023..211.117 ROWS=3333333 loops=3) Planning TIME: 0.196 ms Execution TIME: 481.936 ms (7 ROWS)
OK. Now, for the million dollar question: is it fast?
To test it, I will run the “avg" thing, like above, with various max_parallel_degree, 10 times with each, and will pick the best time for every max_parallel_degree. My desktop has four cores, so we should stop seeing progress after max_parallel_degree = 3.
Results:
max_parallel_degree | best time: |
---|---|
0 | 976.813 ms |
1 | 507.961 ms |
2 | 352.671 ms |
3 | 284.120 ms |
4 | 274.086 ms |
5 | 281.515 ms |
6 | 280.768 ms |
7 | 279.903 ms |
8 | 279.399 ms |
9 | 279.838 ms |
This is pretty good. First of all – speedup is immense. Second of all – it's almost perfect parallelization. This is absolutely amazing.
The only gripe I have, is lack of documentation. While checking this, I found undocumented column (aggcombinefn) in pg_aggregate (not sure if it's related or not), and I didn't find any information on how to make my own aggregate parallelizable. Which would be interesting, as creating own aggregates is very simple in Pg, so it would be really cool if we could make it possible to be defined in such a way that parallelization would work.
Small gripe about docs aside it's a great patch, that will definitely be welcome in many cases. Thanks a lot.
Hi,
Thanks for the post!
Just a couple of notes: If I do 976.813 / 2 / 507.961, I come back to 96.15% efficiency for 1 worker, vs 0 workers. It’s good, but I found it to be 99.13% on some tests I did. Likely the reason is due to my test being longer running which will help drown the parallel worker setup costs a little more. It would be interesting to see if you find that too if you have, say, 10 times more rows.
Also this percentage will start to tail off with more groups as the Finalize Aggregation stage, which runs only on the lead process, has to combine the groups from each worker. Also more workers likely means yet more partial groups too, which would be another (small) reason why the linear parallelisation won’t remain as high with higher worker counts, although this is really only going to be noticeable when many more groups are involved. Also there’s going to be other factors coming into play there too, CPU cache thrashing etc.
For the AVG() aggregate, we will hopefully see some pending follow-on patches committed soon which allow many more aggregates to participate in parallelism.
David
The avg() aggregate parallelization is on the final 9.6 release. Doing the test with the same dataset, I got:
test=> explain analyze select avg(number) from test;
QUERY PLAN
——————————————————————————————————————————————–
Finalize Aggregate (cost=127517.23..127517.24 rows=1 width=32) (actual time=519.046..519.046 rows=1 loops=1)
-> Gather (cost=127516.70..127517.21 rows=5 width=32) (actual time=517.739..519.034 rows=6 loops=1)
Workers Planned: 5
Workers Launched: 5
-> Partial Aggregate (cost=126516.70..126516.71 rows=1 width=32) (actual time=512.356..512.356 rows=1 loops=6)
-> Parallel Seq Scan on test (cost=0.00..121516.76 rows=1999976 width=4) (actual time=0.023..343.913 rows=1666667 loops=6)
Planning time: 0.165 ms
Execution time: 520.030 ms