I recently needed to add some notifications between processes in an app I’m building and thought I’d try someting different. Traditionally I’ve used things like redis, rabbitmq, aws sns, kafka and others for this problem. But I’m trying to go full on postgres-for-everything mode where possible – I was inspired by this post a few years back and have selectively been trying some of these approaches. I successfully have started using postgres for my job queue (maybe I’ll do a quick post on that at some point), so now I figured it’s worth trying to roll my own Pub/Sub setup.

NOTIFY and LISTEN Link to heading

Postgres has native Pub/Sub capabilities via the NOTIFY and LISTEN commands, so it’s really just a matter of wiring them up to your applicaiton. The beautiful thing about doing this natively in the database, is that the visiblity of these notifications is scoped to the transaction they are contained in. This is one minor pet-peeve I have with using a separate system for notifications, you cannot guarantee:

  1. The entity exists when you publish the event. Depending how you structure your events/messages, if you need to re-query for more information, the entity likely exists, but in the case your message is published and consumed faster than it is persisted to the DB, you are in an interim state where you need to keep retrying until it arrvies.
BEGIN

-- create some widget, give it an id, XYZ

-- publish notification that widget XYX was created

<---  Until the tx is written to the db, widget XYZ is not visible

COMMIT
  1. The entity will ever exist. This is basically the worst case from the above example. Instead of the commit succeeding, there’s, say, an integrity error when attempting to commit the transaction, and the whole thing is rolled back. Now you have an event published for an entity that will never exist.
BEGIN

-- create some widget, give it an id, XYZ

-- publish notification that widget XYX was created

COMMIT  <--- Goes boom, and you need to rollback the tx.

I have hit both of these situations in the past, and there are certainly workarounds, but they involve a lot of extra code.

Pub/Sub in practice Link to heading

It’s actually quite simple to get this working. Here’s a python example using SQLAlchemy (which doesn’t directly support this feature), we use the underlying database driver (psycopg) which does support it. Note we need to use isolation_level='AUTOCOMMIT' to avoid having SQLAlchemy automatically wrap this in a tx, which doesn’t play nice with this setup.

from psycopg import Connection
from sqlalchemy import Engine, create_engine

engine = create_engine("postgresql+psycopg://postgres:postgres@localhost:5432/postgres")

with engine.connect().execution_options(
    isolation_level="AUTOCOMMIT"
) as conn:
    driver_conn: Connection = conn.connection.driver_connection
    driver_conn.execute("LISTEN my_channel")

    print("Waiting for messages...")

    for message in driver_conn.notifies():
        print(f"Got message: {message}")

And the producer:

from psycopg import Connection
from sqlalchemy import Engine, text, create_engine
from time import sleep

engine = create_engine("postgresql+psycopg://postgres:postgres@localhost:5432/postgres")

with engine.connect() as conn:
    for i in range(3):
        sql = text(f"SELECT pg_notify('my_channel', 'Hello world #{i}')")
        conn.execute(sql)
        print(f"Published #{i}")
        sleep(1)

    conn.commit()
    print("Committed!")

Putting those together, we get something like this, notice the messages are all published at the time of commit, wohoo!

Gotchas Link to heading

This is a super simple, no extra framework/infra needed solution to the problem. It’s not perfect, but this is another tool to add into your toolbox when you need some basic messaging between processes. A few gotchas:

  1. You probably want to use a dedicated db connection to consume the messages.
  2. This is simple Pub/Sub model – if you are not listening, you will not receive messages. You’ll need something fancier if you need some stronger guarantees, but this provides a solid foundation for more complex topologies.
  3. You probably want some sort of heartbeat to ensure the notification connection is still alive. Here’s a slightly improved version of the above example:
from psycopg import Connection
from sqlalchemy import Engine, create_engine
from time import time

engine = create_engine("postgresql+psycopg://postgres:postgres@localhost:5432/postgres")

with engine.connect().execution_options(
    isolation_level="AUTOCOMMIT"
) as conn:
    driver_conn: Connection = conn.connection.driver_connection
    driver_conn.execute("LISTEN my_channel")

    print("Waiting for messages...")

    last_heartbeat = 0
    while True:
        if time() - last_heartbeat > 60:
            print("Checking if connection still alive")
            driver_conn.execute("SELECT 1")
            last_heartbeat = time()

        for message in driver_conn.notifies(timeout=1):
            print(f"Got message: {message}")