Some time ago, I wrote blogpost about how to pick a task from queue, without locking.
It was written in 2013, and as such it couldn't reflect everything we have now in PostgreSQL – namely SKIP LOCKED – which was added to PostgreSQL over year later.
Two people mentioned SKIP LOCKED in comments, but it was before it was committed even to git repo. But now, we have, officially released, PostgreSQL version with this mechanism, so let's see what it can do.
I used scripts that were created for the old blogpost:
generate_random_data.py:
#!/usr/bin/env python # -*- coding: utf-8 -*- import string import random import time def line_generator(use_time): size = 50 chars = string.letters + string.digits random_string = ''.join(random.choice(chars) for x in range(size)) priority = 49 - len(random_string.lstrip(random_string[0])) timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(use_time)) return '%s\t%s\t%s\n' % (random_string, str(priority), timestamp) start_time = time.time() - 100000000 f = file('/tmp/md5_data.txt', 'w') for i in xrange(1, 10001): f.write(line_generator(start_time)) start_time += 1 f.close()
and, processor-3.py:
#!/usr/bin/env python # -*- coding: utf-8 -*- import psycopg2 import hashlib import sys def process_item(row): h = hashlib.md5() h.update(row[0]) return h.hexdigest() conn = psycopg2.connect("port=5960 host=127.0.0.1 user=depesz dbname=depesz") cur = conn.cursor() processed = 0 while True: cur.execute(''' select * from queue where pg_try_advisory_xact_lock(123, hashtext( priority::text || added_on::text ) ) order by priority desc, added_on asc limit 1 for update ''') row = cur.fetchone() if row is None: break ignore = process_item(row) processed = processed + 1 cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2])) conn.commit() cur.close() conn.close() print "processed %d rows." % (processed,)
Table was created, and populated, as previously, with:
$ CREATE TABLE queue ( job text, priority int4, added_on timestamptz ); $ \copy queue FROM /tmp/md5_data.txt $ CREATE INDEX queue_idx ON queue (priority DESC, added_on ASC);
Sanity checking:
=$ for i in {1..8}; do ./processor-3.py & done; time wait [1] 16248 [2] 16249 [3] 16250 [4] 16251 [5] 16252 [6] 16253 [7] 16254 [8] 16255 processed 1249 rows. processed 1251 rows. processed 1248 rows. processed 1251 rows. processed 1250 rows. processed 1252 rows. processed 1249 rows. processed 1250 rows. [1] Done ./processor-3.py [2] Done ./processor-3.py [3] Done ./processor-3.py [4] Done ./processor-3.py [5] Done ./processor-3.py [6] Done ./processor-3.py [7]- Done ./processor-3.py [8]+ Done ./processor-3.py real 0m15.792s user 0m1.056s sys 0m0.456s
Looks like I processed all 10000 rows, and it took almost 16 seconds.
So, now let's see how it would work using SKIP LOCKED.
I will need to create new processor, let's name it “processor-4.py", with this source:
#!/usr/bin/env python # -*- coding: utf-8 -*- import psycopg2 import hashlib import sys def process_item(row): h = hashlib.md5() h.update(row[0]) return h.hexdigest() conn = psycopg2.connect("port=5960 host=127.0.0.1 user=depesz dbname=depesz") cur = conn.cursor() processed = 0 while True: cur.execute(''' select * from queue order by priority desc, added_on asc limit 1 for update skip locked ''') row = cur.fetchone() if row is None: break ignore = process_item(row) processed = processed + 1 cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2])) conn.commit() cur.close() conn.close() print "processed %d rows." % (processed,)
Running it with 8 worker processes looked like:
=$ for i in {1..8}; do ./processor-4.py & done; time wait [1] 17107 [2] 17108 [3] 17109 [4] 17110 [5] 17111 [6] 17112 [7] 17113 [8] 17114 processed 1252 rows. processed 1249 rows. processed 1248 rows. processed 1252 rows. processed 1249 rows. processed 1249 rows. processed 1251 rows. processed 1250 rows. [1] Done ./processor-4.py [2] Done ./processor-4.py [4] Done ./processor-4.py [6] Done ./processor-4.py [7]- Done ./processor-4.py [3] Done ./processor-4.py [5]- Done ./processor-4.py [8]+ Done ./processor-4.py real 0m15.579s user 0m1.100s sys 0m0.404s
Time difference is negligible, but the biggest benefit is that we can leave advisory locks for other purposes. And this is clearly a win.
People may wonder how many TPS PostgreSQL can handle when it’s used as a message queue. Lets consider a case when our working set fits into memory. Here are some benchmark results on a more or less regular server (12 cores, 24 Gb ram, HDD)
schema.sql:
postgresql.conf:
kv.pgbench:
benchmark.sh:
I got 104 000 TPS which sometimes drops to 90K TPS (when buffers are synced to disk). Without `synchronous_commit=off` I got 1400 TPS, but I doubt that some message queue that honestly fsync’s every enqueued message will show better results.
Sorry for wrong formatting – unfortunately there is no comments preview available 🙁 Here is the same comment in plain text: http://paste.ubuntu.com/16234677/
@Aleksander:
I hope I fixed it.