Grouping data into array of sums – fun with custom aggregates

Was asked recently about optimization of interesting case. There was table like:

=$ CREATE TABLE input_data (
    category_id        INT8,
    object_id          INT8,
    interaction_ts     timestamptz,
    interaction_type    TEXT,
    interaction_count  INT4
);

And there was a code that was grouping it all by sum()ing interaction_count per category, object, interaction_type, and timestamp truncated to hour.

Basically, storing somewhere result of:

=$ SELECT
    category_id,
    object_id,
    date_trunc( 'hour', interaction_ts ) AS ts,
    SUM(interaction_count) FILTER (WHERE interaction_type = 'a') AS a_count,
    SUM(interaction_count) FILTER (WHERE interaction_type = 'b') AS b_count
FROM
    input_data
GROUP BY 1, 2, 3;

While talking about optimizations, one idea that came was to store whole day of counts in single row, as array. So the resulting count table would be:

=$ CREATE TABLE results (
    category_id      int8,
    object_id        int8
    interaction_day  DATE,
    a_counts         int4[],
    b_counts         int4[]
);

Where a_counts, and b_counts would always have 24 elements, one for each hour.

Now, how to roll it up like this?

This can actually by done relatively simply using custom aggregate.

First step, as always, is making simple example. We have input table, let's put there some random-ish data. I can generate five columns for input data using query like:

=$ SELECT
    FLOOR( 1 + random() * 2 ),
    FLOOR( 11 + random() * 2 ),
    now() - '2 hours'::INTERVAL * random(),
    CASE WHEN random() < .5 THEN 'a' ELSE 'b' END,
    FLOOR( 1 + random() * 5 );
 FLOOR | FLOOR |           ?COLUMN?            | CASE | FLOOR 
-------+-------+-------------------------------+------+-------
     2 |    11 | 2024-11-15 13:55:39.234459+01 | b    |     2
(1 ROW)

And I can use it with generate_series() to make any number of such rows, for starters, let's use 20:

=$ INSERT INTO input_data (category_id, object_id, interaction_ts, interaction_type, interaction_count)
SELECT
    FLOOR( 1 + random() * 2 ),
    FLOOR( 11 + random() * 2 ),
    now() - '2 hours'::INTERVAL * random(),
    CASE WHEN random() < .5 THEN 'a' ELSE 'b' END,
    FLOOR( 1 + random() * 5 )
FROM
    generate_series(1, 20) returning *;
 category_id | object_id |        interaction_ts         | interaction_type | interaction_count 
-------------+-----------+-------------------------------+------------------+-------------------
           2 |        12 | 2024-11-15 13:26:45.825033+01 | b                |                 1
           2 |        12 | 2024-11-15 13:13:08.097662+01 | a                |                 2
           2 |        11 | 2024-11-15 14:00:14.481363+01 | b                |                 1
           2 |        11 | 2024-11-15 13:00:51.510032+01 | b                |                 4
           1 |        12 | 2024-11-15 13:49:50.655204+01 | a                |                 2
           2 |        11 | 2024-11-15 14:35:07.760246+01 | b                |                 3
           1 |        12 | 2024-11-15 14:09:45.774415+01 | b                |                 4
           2 |        12 | 2024-11-15 14:12:11.400831+01 | b                |                 2
           2 |        11 | 2024-11-15 12:54:46.544333+01 | a                |                 2
           2 |        11 | 2024-11-15 13:42:31.420343+01 | b                |                 5
           1 |        11 | 2024-11-15 13:36:47.467976+01 | a                |                 1
           1 |        12 | 2024-11-15 14:22:13.069095+01 | b                |                 3
           1 |        12 | 2024-11-15 14:33:29.341066+01 | b                |                 5
           2 |        11 | 2024-11-15 14:10:17.165473+01 | a                |                 1
           1 |        11 | 2024-11-15 13:34:39.304185+01 | b                |                 5
           2 |        12 | 2024-11-15 13:25:43.083566+01 | b                |                 2
           1 |        11 | 2024-11-15 13:10:26.565213+01 | a                |                 3
           1 |        12 | 2024-11-15 14:13:20.58935+01  | b                |                 4
           1 |        11 | 2024-11-15 13:12:04.467726+01 | b                |                 4
           1 |        12 | 2024-11-15 12:46:44.955483+01 | b                |                 1
(20 ROWS)
 
INSERT 0 20

The basic grouping/counting can be tested using previously shown here, here with extra order by:

=$ SELECT
    category_id,
    object_id,
    date_trunc( 'hour', interaction_ts ) AS ts,
    SUM(interaction_count) FILTER (WHERE interaction_type = 'a') AS a_count,
    SUM(interaction_count) FILTER (WHERE interaction_type = 'b') AS b_count
FROM
    input_data
GROUP BY 1, 2, 3
ORDER BY 1, 2, 3;
 category_id | object_id |           ts           | a_count | b_count 
-------------+-----------+------------------------+---------+---------
           1 |        11 | 2024-11-15 13:00:00+01 |       4 |       9
           1 |        12 | 2024-11-15 12:00:00+01 |         |       1
           1 |        12 | 2024-11-15 13:00:00+01 |       2 |        
           1 |        12 | 2024-11-15 14:00:00+01 |         |      16
           2 |        11 | 2024-11-15 12:00:00+01 |       2 |        
           2 |        11 | 2024-11-15 13:00:00+01 |         |       9
           2 |        11 | 2024-11-15 14:00:00+01 |       1 |       4
           2 |        12 | 2024-11-15 13:00:00+01 |       2 |       3
           2 |        12 | 2024-11-15 14:00:00+01 |         |       2
(9 ROWS)

Sweet. And now I'd like to change it into 5 rows that will have 1/2 for category_id, 11/12, each containing all data for 2024-11-15.

To build such dataset, it would be best to make aggregate that would get hour and interaction_count, and it would build array accordingly.

I can do:

CREATE aggregate sum_per_hour( int4, int4 ) (
    sfunc = sum_per_hour,
    stype = int8[],
    initcond = '[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}'
);

Let's break it into pieces. First line means that i will make an aggregate (like sum(), count(), avg()) named sum_per_hour, that will take two arguments, both int4.

This is not written in there, but first argument will be hour in range 0 to 23, and the second is count that should be added to sum for given hour.

Next line (sfunc…) tells Pg that when running this aggregate, it should run function named sum_per_hour(…), which will contain the logic for this aggregate.

Next line (stype) tells Pg that the aggregate will return array of int8 values (it is also used to keep state between calls to the sum_per_hour function.

Final line makes it so that before first run, state of the aggregate will be twenty four element array, where each element is 0, and the array should be indexed from 0 to 23 (normally Pg array indexes start with 1). This will make certain operations easier.

This command will, of course, not work, because sum_per_hour function is not there. Yet. So, let's write it.

Aggregate functions have one more parameter than aggregates themselves – and this is to pass “state". So, the function will have to look like:

CREATE FUNCTION sum_per_hour( INOUT p_state int8[], IN p_hour int4, IN p_count int4 ) RETURNS int8[]

It will get state as first argument, then hour and count as next ones, and will return modified state (which is also result after all rows have been aggregated).

There is nothing now to do aside from writing the function 🙂 Luckily it will be rather simple one:

=$ CREATE FUNCTION sum_per_hour( INOUT p_state int8[], IN p_hour int4, IN p_count int4 ) RETURNS int8[] LANGUAGE plpgsql AS $$
DECLARE
BEGIN
    -- sanity checks
    IF p_hour < 0 THEN
        raise exception 'Hour can''t be < 0 : %', p_hour;
    END IF;
    IF p_hour > 23 THEN
        raise exception 'Hour can''t be > 23 : %', p_hour;
    END IF;
 
    -- actual count modification
    p_state[ p_hour ] := p_state[ p_hour ] + p_count;
 
    RETURN;
END;
$$;

As you can see most of the code are sanity checks for the hour value 🙂

We can try how it works:

=$ SELECT sum_per_hour( '[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}', 1, 15 );
                       sum_per_hour                        
-----------------------------------------------------------
 [0:23]={0,15,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}
(1 ROW)
 
=$ SELECT sum_per_hour( '[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}', -1, 15 );
ERROR:  HOUR can't be < 0 : -1
CONTEXT:  PL/pgSQL function sum_per_hour(bigint[],integer,integer) line 5 at RAISE
 
=$ select sum_per_hour( '[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}', 24, 15 );
ERROR:  Hour can't be > 23 : 24
CONTEXT:  PL/pgSQL FUNCTION sum_per_hour(BIGINT[],INTEGER,INTEGER) line 8 at RAISE

Looks promising. With the function in place I can add the aggregate:

=$ CREATE aggregate sum_per_hour( int4, int4 ) (
    sfunc = sum_per_hour,
    stype = int8[],
    initcond = '[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}'
);

So far, so good. Let's try it now:

=$ SELECT
    category_id,
    object_id,
    date_trunc( 'day', interaction_ts ) AS interaction_date,
    sum_per_hour( EXTRACT( 'hour' FROM interaction_ts)::int4, interaction_count ) FILTER (WHERE interaction_type = 'a') AS a_counts,
    sum_per_hour( EXTRACT( 'hour' FROM interaction_ts)::int4, interaction_count ) FILTER (WHERE interaction_type = 'b') AS b_counts
FROM
    input_data
GROUP BY 1, 2, 3
ORDER BY 1, 2, 3;
 category_id | object_id |    interaction_date    |                         a_counts                         |                         b_counts                          
-------------+-----------+------------------------+----------------------------------------------------------+-----------------------------------------------------------
           1 |        11 | 2024-11-15 00:00:00+01 | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,4,0,0,0,0,0,0,0,0,0,0} | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,9,0,0,0,0,0,0,0,0,0,0}
           1 |        12 | 2024-11-15 00:00:00+01 | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0} | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,1,0,16,0,0,0,0,0,0,0,0,0}
           2 |        11 | 2024-11-15 00:00:00+01 | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,2,0,1,0,0,0,0,0,0,0,0,0} | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,9,4,0,0,0,0,0,0,0,0,0}
           2 |        12 | 2024-11-15 00:00:00+01 | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0} | [0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,3,2,0,0,0,0,0,0,0,0,0}
(4 ROWS)

All done. Well, kinda. There is also an issue of merging data. The problem is that we might cache the counts in some side table, and then add new set of interactions, and we should have a way to merge them together. So if for some category_id/object/day we would have a_counts:

[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0}

and in new data we would have:

[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,12,4,0,0,0,0,0,0,0,0,0,0}

Then after merging we should get:

[0:23]={0,0,0,0,0,0,0,0,0,0,0,0,12,6,0,0,0,0,0,0,0,0,0,0}

This can be done using simple:

=$ CREATE FUNCTION sum_hour_arrays( IN p_left int8[], IN p_right int8[] ) RETURNS int8[] AS $$
DECLARE
    i int4;
    v_result int8[];
BEGIN
    FOR i IN 0..23 LOOP
        v_result[i] := p_left[i] + p_right[i];
    END LOOP;
    RETURN v_result;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION

Great. With this function we can now use MERGE to store aggregated data, and add to the aggregates later on 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.