HOWTO: Automatically Responding To PostgreSQL Table Changes With Twisted

Many times we want a script to do something when our database has been modified. But how do we know when that happens? It is common to poll the server periodically. Maybe our application has a button to refresh some data. Check email is an example. It has a check email button and can be configured to periodically check for new mail. In effect it is just pressing it's own button every minute or so. This is wrong for so many reasons.

Are we there yet? No. Are we there yet? No. Are we there yet? No. All the way to Disney Land. This is just wrong. The following is much better - Tell me when we get to Disney Land. We are at Disney Land. Sure you can write a server script that sends a message to the client whenever a table changes but now the server script is the one polling the database. Wouldn't it be better if the database just notified the client?

If you have five minutes I will describe a technique that can be used for any number of things including the following:

What?

If you are using PostgreSQL for your database server you can take advantage of a really cool feature called notify to send signals between connections to the same database. Usage is simple.

LISTEN at_disney_land;
NOTIFY at_disney_land;

There is nothing else to it. If you open psql in a window and execute the command 'listen at_disney_land;' then you will get a notification when someone connected to the same database executes the command 'notify at_disney_land;'.

Using this feature in conjunction with triggers allows for alerting all interested clients that something has happened. Your Twisted program can execute a function automatically without needing to poll. This document describes a method of doing just that. This may not be the best method and it may not be the most Twisted method. For me it is the easiest method. I am new to Twisted so if there is a better way please document and share! I searched with Google for how to do this and it felt like I was in a Bing commercial.

For this example one could put the notify in a rule. Trigger functions for me are just simpler. Not only that but, while not used in this example, PostgreSQL supports this really sick procedural language.

Unfortunately there is no way to pass any kind of payload data along with the notification - it's just a string. Our PL/pgSQL function will use string concatenation so we will at least know which operation occured on which table.

Getting Right Down To It

You need the psycopg2 driver installed. The following class depends on it. Either get over it or change it. Psycho Pig Two is the coolest name for a database driver! You can still use whatever driver you want for the rest of your Twisted application. You will just have a dedicated connection for receiving your asynchronous notifications on.

Oh and you need Twisted and Python but if you don't already know that by now then stop reading this and go to Disney Land.

Here is the class that we will extend in a moment:

asyncnotify.py

class AsyncNotify():
        '''Class to trigger a function via PostgreSQL NOTIFY messages. 
Refer to the documentation for more information on LISTEN, NOTIFY and UNLISTEN. 
http://www.postgresql.org/docs/8.3/static/sql-notify.html

This obscure feature is very useful. You can create a trigger function on a
table that executes the NOTIFY command. Then any time something is inserted,
updated or deleted your Python function will be called using this class.

As far as I know this is unique to PostgreSQL. I have no use for any other server :-)'''

        def __init__(self, dsn):
                '''The dsn is passed here. This class requires the psycopg2 driver.'''
                import psycopg2
                self.conn = psycopg2.connect(dsn)
                self.conn.set_isolation_level(0)
                self.curs = self.conn.cursor()
                self.__listening = False

        def __listen(self):
                from select import select
                if self.__listening:
                        return 'already listening!'
                else:
                        self.__listening= True
                        while self.__listening:
                                if select([self.curs],[],[],60)!=([],[],[]) and self.curs.isready():
                                        if self.curs.connection.notifies:
                                                pid, notify = self.curs.connection.notifies.pop()
                                                self.gotNotify(pid, notify)

        def addNotify(self, notify):
                '''Subscribe to a PostgreSQL NOTIFY'''
                sql = "LISTEN %s" % notify
                self.curs.execute(sql)

        def removeNotify(self, notify):
                '''Unsubscribe a PostgreSQL LISTEN'''
                sql = "UNLISTEN %s" % notify
                self.curs.execute(sql)

        def stop(self):
                '''Call to stop the listen thread'''
                self.__listening = False

        def run(self):
                '''Start listening in a thread and return that as a deferred'''
                from twisted.internet import threads
                return threads.deferToThread(self.__listen)

        def gotNotify(self, pid, notify):
                '''Called whenever a notification subscribed to by addNotify() is detected.
Unless you override this method and do someting this whole thing is rather pointless.'''
                pass

Next we have a script that shows an example of how to use the class. Change host, user and password. But don't run it just yet because it won't work.

In the next section we will create a database for use with this script and set it up with the PL/pgSQL language. We will create a couple of test tables and a trigger function which will issue a notify of the table name that called it concatinated with an underscore and the operation (insert, update or delete). Then we will create the actual triggers on each of the test tables that will call this function before anything is inserted, updated or deleted.

asyncnotify-example.py
from asyncnotify import AsyncNotify
from twisted.internet import reactor

dbname = 'mytestdb'
host = 'localhost'
user = 'your_username'
password = 'your_secret_password'

dsn = 'dbname=%s host=%s user=%s password=%s' % (dbname, host, user, password)

def errorHandler(error):
        print str(error)
        notifier.stop()
        reactor.stop()

def shutdown(notifier):
        print 'Shutting down the reactor'
        reactor.stop()

def tableUpdated(notify, pid):
        # This function is called by magic.
        tablename, op = notify.split('_')
        print '%s just occured on %s from process ID %s' % (op, tablename, pid)

class myAsyncNotify(AsyncNotify):
        # gotNotify is called with the notification and pid.
        # Override it and do something great.
        def gotNotify(self, pid, notify):
                if notify == 'quit':
                        # The stop method will end the deferred thread
                        # which is listening for notifications.
                        print 'Stopping the listener thread.'
                        self.stop()
                elif notify.split('_')[0]  in ('table1', 'table2'):
                        tableUpdated(notify, pid)
                else:
                        print "got asynchronous notification '%s' from process id '%s'" % (notify, pid)

notifier = myAsyncNotify(dsn)

# Start listening for subscribed notifications in a deferred thread.
listener = notifier.run()

# What to do when the AsyncNotify stop method is called to
listener.addCallback(shutdown)
listener.addErrback(errorHandler)

# Call the gotNotify method when any of the following notifies are detected.
notifier.addNotify('test1')
notifier.addNotify('test2')
notifier.addNotify('table1_insert')
notifier.addNotify('table1_update')
notifier.addNotify('table1_delete')
notifier.addNotify('table2_insert')
notifier.addNotify('table2_update')
notifier.addNotify('table2_delete')
notifier.addNotify('quit')

# Unsubscribe from one
reactor.callLater(15, notifier.removeNotify, 'test2')

reactor.run()


Create Test Database

Let's create a database to test this out. I am assuming that you have PostgreSQL configured and that you have a role already defined.

   createdb mytestdb

Now connect to the new test database.

   psql mytestdb

Create some test tables.

CREATE TABLE table1 (
        message text
);
CREATE TABLE table2 (
        message text
);

Hook up our database with PL/pgSQL language.

CREATE PROCEDURAL LANGUAGE plpgsql;

Create a trigger function in PL/pgSQL.

CREATE FUNCTION notify_trigger() RETURNS trigger AS $$

DECLARE

BEGIN
 -- TG_TABLE_NAME is the name of the table who's trigger called this function
 -- TG_OP is the operation that triggered this function: INSERT, UPDATE or DELETE.
 execute 'NOTIFY ' || TG_TABLE_NAME || '_' || TG_OP;
 return new;
END;

$$ LANGUAGE plpgsql;

Create triggers on the test tables

CREATE TRIGGER table1_trigger BEFORE insert or update or delete on table1 execute procedure notify_trigger();
CREATE TRIGGER table2_trigger BEFORE insert or update or delete on table2 execute procedure notify_trigger();

We're all done.

Testing It Out

Run asyncnotify-example.py in another window. Each of the following SQL statements should cause the script to print a line of output.

notify test1;
notify test2;

Well the second notify only works for fifteen seconds after you started asyncnotify-example.py because it gets removed by reactor.callLater.

Now the magic you have all been waiting for. Execute these statements and watch the asyncnotify-example.py window:

INSERT INTO table1 VALUES ('hello');
UPDATE table1 SET message = 'hello, world';
DELETE FROM table1;
INSERT INTO table2 VALUES ('hello');
UPDATE table2 SET message = 'hello, world';
DELETE FROM table2;

After you have had your fun execute this command

notify quit;

Comments, Suggestions or Donations?

I hope you find this technique useful. free to send comments or suggestions to steveATdivillo_com.