Going from generator coroutines to Kamaelia Components

May 17, 2009 at 02:09 AM | categories: python, oldblog | View Comments

Earlier this evening an announcement by Pete Fein regarding the formation of a python-concurrency mailing list (aka Python Concurrency Special Interest Group) bounced past my inbox in the unladen-swallow mailing list. Naturally, given my work on Axon (a concurrency library) and Kamaelia (a bunch of components that use it), it jumped out at me as interesting. (5 minute overview for those that don't know what Kamaelia is...)

Pete then posted to the list a small collection of different ways of implementing...
#!/bin/sh
tail -f /var/log/system.log |grep pants
... in python.

This struck me as a nice example worth comparing to Kamaelia - specifically to compare the generator version. Indeed the Kamaelia version is fairly directly derived from the generator version, which is why I'm picking on it. The full generator version looks like this:
import time
import re

def follow(fname):
    f = file(fname)
    f.seek(0,2) # go to the end
    while True:
        l = f.readline()
        if not l: # no data
            time.sleep(.1)
        else:
            yield l

def grep(lines, pattern):
    regex = re.compile(pattern)
    for l in lines:
        if regex.match(l):
            yield l

def printer(lines):
    for l in lines:
        print l.strip()

f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)

for i in p:
    pass

Now, this is nice, but the core logic:
tail -f /var/log/system.log |grep pants
Has somehow become obfuscated at the end - which is a shame, because it doesn't need to be. Taking the Kamaelia version step by step, let's take that end part:
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)

for i in p:
    pass
... and turn it into something which looks like what you'd do in Kamaelia:
from Kamaelia.Chassis.Pipeline import Pipeline

Pipeline(
    Follow('tail -f /var/log/system.log'),
    Grep(".*pants.*"),
    Printer(),
).run()
(incidentally, to actually use separate processes, ala shell, you'd use ProcessPipeline, not Pipeline)

So in order for this to be valid, we now need to adapt follow into Follow, grep into Grep, and printer into Printer.

Let's start by adapting follow:
def follow(fname):
    f = file(fname)
    f.seek(0,2) # go to the end
    while True:
        l = f.readline()
        if not l: # no data
            time.sleep(.1)
        else:
            yield l
First of all, we note that this follow source blocks - specifically calling time.sleep. Now there are other ways of doing this, but the simplest way of keeping this code structure is to just create a threaded component. We also need to capture the argument (fname) which isn't optional and has no logical default, so we need to have an __init__ method. So we grab that, store it somewhere handy, and call the super class initialiser. Pretty standard stuff.
import Axon
class Follow(Axon.ThreadedComponent.threadedcomponent):
    def __init__(self, fname, **argv):
        self.fname = fname
        super(Follow,self).__init__(**argv)
    def main(self):
        f = file(self.fname)
        f.seek(0,2) # go to the end

        while not self.dataReady("control"):
            l = f.readline()
            if not l: # no data
                time.sleep(.1)
            else:
                self.send(l, "outbox")

        f.close()

        self.send(self.recv("control"), "signal")
The main() method of this component now follows the same core logic as the follow() generator, as highlighted in green. The logic added in blue, is simply control logic, and is fairly commonly used, which looks like this:
        while not self.dataReady("control"):
            <Body of generator>
        self.send(self.recv("control"), "signal")
This allows someone to shutdown our component cleanly. Other than that, the other major difference is that this:
            yield l
Has been replaced with this:
            self.send(l, "outbox")

ie rather than expecting to be called in a "call me & handle my values" loop, it just passes it's results out an outbox called "outbox". (much like tail -f spits it's results out of stdout)

The next part to convert is grep(). Recalling this, this looks like this:
def grep(lines, pattern):
    regex = re.compile(pattern)
    for l in lines:
        if regex.match(l):
            yield l
With the Kamaelia equivalent looking like this:
class Grep(Axon.Component.component):
    # Default pattern, override in constructor with pattern="some pattern"
    # See below
    pattern = "."
    def main(self):
        regex = re.compile(self.pattern)
        while not self.dataReady("control"):
           for l in self.Inbox("inbox"):
               if regex.match(l):
                   self.send(l, "outbox")

           self.pause()
           yield 1
        self.send(self.recv("control"), "signal")

Once again the overall logic is unchanged, and again the shutdown logic added is in blue.

However, we don't need to explicitly pass in a location for lines to enter this component. Specifically we just expect to be passed lines form out standard inbox "inbox", in the same way normal grep expects data on stdin.

Also, because we have a logical default for the pattern (match everything using "."), we don't need to have an __init__ method. The reason for this is because the baseclass for all components does this in its __init__ method:
   def __init__(self, *args, **argd):
..
      self.__dict__.update(argd)

So calling Grep(pattern=".*pants.*") will specialise the component value of pattern (The reason for this is it actively enables monkey-patching as reconfiguration, as you can see in this greylisting mail server).

Again Grep is actually a special case of a filter, and as a result really just wants to take data from its std inbox and pass matching stuff to its std outbox. With if you look at the core logic, is precisely what it does:
           for l in self.Inbox("inbox"):
               if regex.match(l):
                   self.send(l, "outbox")


After clearing its inbox, it pauses, and releases control to the scheduler (Pausing is a component's way of letting the scheduler know "don't call me unless there's some data for me to work on").

Finally, we need to implement the Printer() component. Again we can adapt this generator based version, thus:
def printer(lines):
    for l in lines:
        print l.strip()
Which transforms into this:
class Printer(Axon.Component.component):
    def main(self):
        while not self.dataReady("control"):
            for l in self.Inbox("inbox"):
                print l.strip()

            self.pause()
            yield 1
        self.send(self.recv("control"), "signal")

Again the core logic is the same (green), and control logic is blue. Again, rather than needing the explicit "lines" iterator, we have a standard place for data to come into the component - the inbox "inbox" - leaving the rest of the logic essentially unchanged.
Finally, in the same way the generator version puts the whole thing together:
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)

for i in p:
    pass
We can put the Kamaelia components together:
Pipeline(
    Follow('tail -f /var/log/system.log'),
    Grep(".*pants.*"),
    Printer(),
).run()
This is very different. The generator version creates a chain of iterators, passing them in as the first parameter to the next one in the chain, with the last item in the chain (the for loop) being the one that pulls things along. The Kamaelia version instantiates a Pipeline component which is very clearly piping the outputs from one component into the inputs of the following one.

In fact the Pipeline component creates linkages between the components in the pipeline. These are conceptually similar to stackless's channels (though inspired by unix pipes and occam). Implementation whise, a linkage actually collapses an inbox into an outbox. As a result, when Follow sends a message to it's outbox "outbox", that message is instantly delivered into the inbox "inbox" of Grep. Inboxes for generator based components are just plain old lists, and for threaded components they're threadsafe queues.

The nice thing about this, is that if you wanted to use this in a network server, you could do this:

from Kamaelia.Chassis.ConnectedServer import ServerCore

def FollowPantsInMyLogs(*args):
    return Pipeline(
             Follow('tail -f /var/log/system.log'),
             Grep(".*pants.*"),
           )

ServerCore(protocol=FollowPantsInMyLogs, port=8000).run()
Though that opens a Follow & Grep component for every client. To use just one Follow client, you could just follow and publish the data for all clients:

from Kamaelia.Util.Backplane import *

Backplane("PANTSINMYLOGS").activate()

Pipeline(
      Follow('tail -f /var/log/system.log'),
      Grep(".*pants.*"),
      PublishTo("PANTSINMYLOGS"),
).activate()

def clientProtocol(*args):
    return SubscribeTo("PANTSINMYLOGS")

ServerCore(protocol=clientProtocol, port=8000).run()

You could also monitor this on the server console by adding in a local printer:
Backplane("PANTSINMYLOGS").activate()

Pipeline(
      Follow('tail -f /var/log/system.log'),
      Grep(".*pants.*"),
      PublishTo("PANTSINMYLOGS"),
).activate()

Pipeline(
      SubscribeTo("PANTSINMYLOGS"),
      Printer(),
).activate()

def clientProtocol(*args):
    return SubscribeTo("PANTSINMYLOGS")

ServerCore(protocol=clientProtocol, port=8000).run()
... which all fairly naturally describes the higher level co-ordination going on. Now you can do all this from the ground up using plain old generators, but personally I find this approach easier to follow. Some people find others simple :)
Anyway, for the curious reader, I hope this is a useful/interesting comparison of a non-kamaelia based approach with a kamaelia based approach, with a visible demonstration at the end of why I prefer the latter :) ... and all this ignores graphlines too :)

blog comments powered by Disqus