Kamaelia components from decorated generators. Pythonic concurrency?

October 04, 2009 at 10:52 PM | categories: python, oldblog | View Comments

A few months ago, there was a thread on the then google group python-concurrency about some standard forms for showing how some libraries deal with concurrent problems. The specific example chosen looked like this:
#!/bin/sh
tail -f /var/log/system.log |grep pants
Pete Fein also posted an example of this using generators, based on David Beazley's talk on python generators being used as (limited) coroutines:
    import time
    import re

    def follow(fname):
        f = file(fname)
        f.seek(0,2) # go to the end
        while True:
            l = f.readline()
            if not l: # no data
                time.sleep(.1)
            else:
                yield l

    def grep(lines, pattern):
        regex = re.compile(pattern)
        for l in lines:
            if regex.match(l):
                yield l

    def printer(lines):
        for l in lines:
            print l.strip()

    f = follow('/var/log/system.log')
    g = grep(f, ".*pants.*")
    p = printer(g)

    for i in p:
        pass

The question/challenge raised on the list was essentially "what does this look like in your framework or system?". For some reason, someone saw fit to move the mailing list from google groups, and delete the archives, so I can't point at the thread, but I did repost my answer for what was called "99 bottles" for kamaelia on the python wiki .

I quite liked the example for describing how to take this and convert it into a collection of kamaelia components, primarily because by doing so we gain a number of reusable components in this way. For me it was able describing how to move from something rather ad-hoc to something somewhat more generally usable.

For me, the point about Kamaelia is really that it's a component framework aimed at making concurrent problems more tractable & maintainable. Basically so that I can get stuff done quicker, that won't need rewriting completely to use concurrency, which someone else can hack on without needing to come back to me to understand it. In practice though, this also means that I tend to focus on building stuff, rather than asking "is it concurrent?". (Axon kinda ensures that it either is, or is concurrent friendly) This does sometimes also mean I focus on getting the job done, rather than "does this look nice"... Whilst that does matter to me, I do have deadlines like the next person :-)

For example, one thing missing from the above is that when you do something like:
    tail -f /var/log/system.log |grep pants
You aren't interested in the fact this uses 3 processes - tail, grep & parent process - but the fact that by writing it like this you're able to solve a problem quickly and simply. It also isn't particularly pretty, though I personally I view the shell version as rather elegant.

Naturally, being pleased with my version, I blogged about it. Much like anyone else, when I write something it seems like a good idea at the time :-). As sometimes happens, it made it onto reddit with some really nice & honest comments.

And what were those comments? If I had to summarise in one word "ugh!"

Whilst I don't aim for pretty (I aim for safe/correct :), pretty is nice, and pretty is fun. As a result, I've wanted to come back to this.Ugly is no fun :-( . Fun matters :-)

There was also a comment that suggested using decorators to achieve the same goal. However, at that point in time I had a mental block about what that would look like in this case. So I just thought "OK, I agree, can't quite see how to do it". I did recognise though that they're right to say that decorators would improve this case.

In particular the stumbling block is the way python generators are used in the above example is effectively a one way chaining. printer pulls values from grep. grep pulls values from follow. When one of them exits, they all exit. Essentially this is pull based.

In Kamaelia, components can be push, pull or push & pull. Furthermore they can push and pull in as many directions as you need. At the time mapping between the two sensibly it didn't seem tractable to me. Then this morning, as I woke blearily, I realised that the reason why. Essentially the above generator form isn't really directly the same as the shell form - though it is close.

Taking grep, for example, if I do this:
grep "foo" somefile
Then grep will open the file "somefile", read it, and output lines that match the pattern and exit.

However, if I do this:
bla | grep "foo"
Then grep will read values from stdin, and output lines which match the pattern. Furthermore, it will pause outputting values when bla stops pushing values into the chain, and exit when bla exits (after finishing processing stdin). ie It essentially has two modes of operating, based on getting a value or having an absent value.

In essence, the differences about what's happening here are subtle - in the shell we pass in a symbol which represents which stream needs opening, whereas in the example above, we pass in, effectively, an open stream. Also the shell is very much a combination of push and pull, whereas the generator pipeline above is essentially pull.

This made me realise that rather than activating the generator we want to read from *outside* the generator we're piping into, if we activate the generator *inside* the generator we're piping into, the problem becomes tractable.

For example, if we change this:
def grep(lines, pattern):
    regex = re.compile(pattern)
    for l in lines: # Note this requires an activate generator, or another iterable
        if regex.match(l):
            yield l
To this:
def grep(lines, pattern):
    "To stop this generator, you need to call it's .throw() method. The wrapper could do this"
    regex = re.compile(pattern)
    while 1:
        for l in lines(): # Note we activate the generator here inside instead
            if regex.search(l):
                yield l
        yield

We gain something that can operate very much like the command line grep. That is, it reads from its equivalent to stdin until stdin is exhausted. To indicated stdin is exhausted it simply yields - ie yields None. The caller can then go off and get more data to feed grep. Alternatively the caller can shutdown this grep at any point in time by throwing in an exception.

Making this small transform allows the above example to be rewritten as kamaelia components like this:
import sys
import time
import re
import Axon
from Kamaelia.Chassis.Pipeline import Pipeline
from decorators import blockingProducer, TransformerGenComponent

@blockingProducer
def follow(fname):
    "To stop this generator, you need to call it's .throw() method. The wrapper could do this"
    f = file(fname)
    f.seek(0,2) # go to the end
    while True:
        l = f.readline()
        if not l: # no data
            time.sleep(.1)
        else:
            yield l

@TransformerGenComponent
def grep(lines, pattern):
    "To stop this generator, you need to call it's .throw() method"
    regex = re.compile(pattern)
    while 1:
        for l in lines():
            if regex.search(l):
                yield l
        yield

@TransformerGenComponent
def printer(lines):
    "To stop this generator, you need to call it's .throw() method"
    while 1:
        for line in lines():
            sys.stdout.write(line)
            sys.stdout.flush()
        yield

Pipeline(
    follow('/var/log/system.log'),
    grep(None, ".*pants.*"),
    printer(None)
).run()

The implementation for both decorators.py and example.py above can both be found here:
http://code.google.com/p/kamaelia/source/browse/trunk/Sketches/MPS/AxonDecorators/
Similarly, if we wanted to use multiple processes, we could rewrite that final pipeline like this:
    from Axon.experimental.Process import ProcessPipeline

    ProcessPipeline(
        follow('/var/log/system.log'),
        grep(None, ".*pants.*"),
        printer(None)
    ).run()
Specifically the above will use 4 processes. One container process, and 3 subprocesses. (ProcessPipeline would benefit from a rewrite using multiprocess rather than pprocess though)

The other nice thing about this approach is that suppose you wanted to define your own generator source like this:
def source():
    for i in ["hello", "world", "game", "over"]:
        yield i
You could use that instead of "follow" above like this:
    Pipeline(
        grep(source,
".*pants.*"),
        printer(None)
    ).run()
For me, this has a certain symmetry with the change from this
tail somefile.txt | grep ".*pants.*" | cat -
to this:
grep ".*pants.*" source | cat -
ie if you pass in an absent value, it processes the standard inbox "inbox", rather than stdin. If you pass in a value, it's assumed to be a generator that needs activating.

Stepping back, and answering the "why? What does this give you?" question, it becomes more apparent as to why this might be useful when you start monitoring 5 log files at once for POST requests. For example, putting that all together in a single file would look like this:
(assuming you didn't reuse existing components :)
import sys
import time
import re
import Axon
from Kamaelia.Util.Backplane import Backplane, SubscribeTo, PublishTo
from Kamaelia.Chassis.Pipeline import Pipeline
from decorators import blockingProducer, TransformerGenComponent

@blockingProducer
def follow(fname):
    f = file(fname)
    f.seek(0,2) # go to the end
    while True:
        l = f.readline()
        if not l: # no data
            time.sleep(.1)
        else:
            yield l

@TransformerGenComponent
def grep(lines, pattern):
    regex = re.compile(pattern)
    while 1:
        for l in lines():
            if regex.search(l):
                yield l
        yield

@TransformerGenComponent
def printer(lines):
    while 1:
        for line in lines():
            sys.stdout.write(line)
            sys.stdout.flush()
        yield

Backplane("RESULTS").activate()

for logfile in ["com.example.1", "com.example.2", "com.example.3","com.example.4","com.example.5"]:
    Pipeline(
        follow(logfile+"-access.log"),
        grep(None, "POST"),
        PublishTo("RESULTS")
    ).activate()

Pipeline(
    SubscribeTo("RESULTS"),
    printer(None)
).run()
Now, I don't particularly like the word pythonic - maybe it is, maybe it isn't - but hopefully this example does look better than perhaps than last time! The biggest area needing work, from my perspective, in this  example is the names of the decorators.

Since this will be going into the next release of Axon - any feedback - especially on naming - would be welcome :-).

(Incidentally, follow/grep have already been added to kamaelia, so this would really be simpler, but it does make an interesting example IMO :-)
blog comments powered by Disqus