Restarting Python Northwest. 24th Sept

September 14, 2009 at 11:36 PM | categories: python, oldblog | View Comments

A few people will have already noticed some small comments about this, but we're plotting to restart python northwest. Specifically, we're restarting this month.

Details:
  • When: Thursday 24th September, 6pm
  • Who: Who can come? If you're reading this YOU can (assuming you're sufficiently close :-)
    More specifically anyone from beginners, the inexperienced through deeply experienced and all the way back to the plain py-curious.
  •  What: Suggestion is to start off with a social meet, and chat about stuff we've found interesting/useful/fun with python recently. Topics likely to include robots and audio generation, the recent unconference, and europython.

How did this happen? I tweeted the idea, a couple of others seconded it, the David Jones pointed out "it easier to arrange for a specific 2 people to meet than it was to e-mail a vague cloud of people and get _any_ 2 to meet anywhere.", so that's where we'll be.

If twitter feedback is anything go by, we're hardly going to be alone, so please come along - the more the merrier :-) Better yet, please reply to this post saying you're coming along!

More generally, assuming this continues, pynw will probably be every third thursday in the month, maybe alternating between technical meets and social ones. (probably topic for discussion :-)

Please forward this to anyone you think may be interested!

See you there!

Read and Post Comments

Traffic Server to be Open Source?!

July 07, 2009 at 12:02 PM | categories: python, oldblog | View Comments

If this happens this will be awesome. Traffic Server is some really nice code. It's a large codebase, but it's really cool, and it *scales*. (I used to work at Inktomi, so have been inside the code as well). For those that don't know what it is, it's a very high performance web caching proxy, with a plugin architecture, allowing for the addition of other protocols. It used to support HTTP (& obvious friends), NNTP, RTSP, RTP, WMV, etc.

That's pretty much made my day that has.
Read and Post Comments

Europython Videos Transcoding

July 07, 2009 at 01:02 AM | categories: python, oldblog | View Comments

Since I've had a few questions about this, a short status update. At Europython last week I was recording all the talks I was attending. Including the lightning talks this means I have video from 55 talks. The video files from the camera are too large for blip.tv, so I'm transcoding them down to a smaller size, before uploading them. Since these 55 talks are spread over nearly 80 files, that naturally takes time.

Fortunately/obviously, I'm automating this, and it'll come as no shock to some that I'm automating it using kamaelia. This automation needs to to be stoppable, since I need to only do this overnight, for practicality reasons.

Anyway, for those curious, this is the code I'm using to do the transcode & upload. You'll note that it saturates my CPU, keeping both cores busy. Also, it's interleaving an IO bound process (ftp upload) with CPU bound - transcode.

import os
import re
import Axon

from Kamaelia.Chassis.Graphline import Graphline
from Kamaelia.Chassis.Pipeline import Pipeline

class Find(Axon.Component.component):
    path = "."
    walktype = "a"
    act_like_find = True
    def find(self, path = ".", walktype="a"):
        if walktype == "a":
            addfiles = True
            adddirs = True
        elif walktype == "f":
            addfiles = True
            adddirs = False
        elif walktype == "d":
            adddirs = True
            addfiles = False

        deque = []
        deque.insert(0,  (os.path.join(path,x) for x in os.listdir(path)) )
        while len(deque)>0:
            try:
                fullentry = deque[0].next()
                if os.path.isdir(fullentry):
                    if adddirs:
                        yield fullentry
                    try:
                        X= [os.path.join(fullentry,x) for x in os.listdir(fullentry)]
                        deque.insert(0, iter(X))
                    except OSError:
                        if not self.act_like_find:
                            raise
                elif os.path.isfile(fullentry):
                    if addfiles:
                        yield fullentry
            except StopIteration:

                deque.pop(0)

    def main(self):
        gotShutdown = False
        for e in self.find(path = self.path, walktype=self.walktype):
            self.send(e, "outbox")
            yield 1
            if self.dataReady("control"):
                gotShutdown = True
                break

        if not gotShutdown:
            self.send(Axon.Ipc.producerFinished(), "signal")
        else:
            self.send(self.recv("control"), "signal")

class Sort(Axon.Component.component):
    def main(self):
        dataset = []
        while 1:
            for i in self.Inbox("inbox"):
                dataset.append(i)
            if self.dataReady("control"):
                break
            self.pause()
            yield 1
        dataset.sort()
        for i in dataset:
            self.send(i, "outbox")
            yield 1
        self.send(self.recv("control"), "signal")

class Grep(Axon.Component.component):
    pattern = "."
    invert = False
    def main(self):
        match = re.compile(self.pattern)
        while 1:
            for i in self.Inbox("inbox"):
                if match.search(i):
                    if not self.invert:
                        self.send(i, "outbox")
                else:
                    if self.invert:
                        self.send(i, "outbox")
            if self.dataReady("control"):
                break
            self.pause()
            yield 1
        self.send(self.recv("control"), "signal")

class TwoWayBalancer(Axon.Component.component):
    Outboxes=["outbox1", "outbox2", "signal1","signal2"]
    def main(self):
        c = 0
        while 1:
            yield 1
            for job in self.Inbox("inbox"):
                if c == 0:
                    dest = "outbox1"
                else:
                    dest = "outbox2"
                c = (c + 1) % 2

                self.send(job, dest)
                job = None
            if not self.anyReady():
                self.pause()
            if self.dataReady("control"):
                break
        R=self.recv("control")
        self.send(R, "signal1")
        self.send(R, "signal2")


class Transcoder(Axon.ThreadedComponent.threadedcomponent):
    command = 'ffmpeg >transcode.log 2>&1 -i "%(SOURCEFILE)s" -s 640x360 -vcodec mpeg4 -acodec copy -vb 1500000 %(ENCODINGNAME)s'
    def main(self):
        while 1:
            for sourcefile in self.Inbox("inbox"):
                shortname = os.path.basename(sourcefile)
                encoding_name = shortname.replace(".mp4", ".avi")
                finalname = sourcefile.replace(".mp4", ".avi")
                # Do the actual transcode
                print "TRANSCODING", sourcefile, encoding_name
                os.system( self.command % {"SOURCEFILE": sourcefile, "ENCODINGNAME":encoding_name})

                # file is transcoded, move to done
                print "MOVING DONE FILE", sourcefile, os.path.join("done", sourcefile)
                os.rename(sourcefile, os.path.join("done", sourcefile))

                # Move encoded version to upload queue
                upload_name = os.path.join( "to_upload", encoding_name)
                print "MOVING TO UPLOAD QUEUE", encoding_name, upload_name
                os.rename(encoding_name, upload_name )

                # And tell the encoder to upload it please
                print "SETTING OFF UPLOAD",upload_name, finalname
                self.send( (upload_name, finalname), "outbox")
                print "-----------------"
            if self.dataReady("control"):
                break
        self.send(self.recv("control"), "signal")

class Uploader(Axon.ThreadedComponent.threadedcomponent):
    command = "ftpput --server=%(HOSTNAME)s --verbose --user=%(USERNAME)s --pass=%(PASSWORD)s --binary --passive %(UPLOADFILE)s"
    username =
< editted :-) >
    password = < editted :-) >
    hostname = "ftp.blip.tv"
    def main(self):
        while 1:
            for (upload_name, finalname) in self.Inbox("inbox"):
                print "UPLOADING", upload_name
                os.system( self.command % {
                                        "HOSTNAME":self.hostname,
                                        "USERNAME":self.username,
                                        "PASSWORD":self.password,
                                        "UPLOADFILE":upload_name,
                                     } )
                print "MOVING", upload_name, "TO", os.path.join("encoded", finalname)
                os.rename(upload_name, os.path.join("encoded", finalname))
                print "-----------------"

            if self.dataReady("control"):
                break
            if not self.anyReady():
                self.pause()
        self.send(self.recv("control"), "signal")

Graphline(
    FILES = Pipeline(
                Find(path=".",walktype="f"),
                Sort(),
                Grep(pattern="(done|encoded|unsorted|transcode.log|to_upload)",
                     invert = True),
            ),
    SPLIT = TwoWayBalancer(), # Would probably be nicer as a customised PAR chassis
    CONSUME1 = Pipeline(
                    Transcoder(),
                    Uploader(),
               ),
    CONSUME2 = Pipeline(
                    Transcoder(),
                    Uploader(),
               ),
    linkages = {
        ("FILES","outbox"):("SPLIT","inbox"),
        ("SPLIT","outbox1"):("CONSUME1","inbox"),
        ("SPLIT","outbox2"):("CONSUME2","inbox"),

        ("FILES","signal"):("SPLIT","control"),
        ("SPLIT","signal1"):("CONSUME1","control"),
        ("SPLIT","signal2"):("CONSUME2","control"),
    }
).run()

It should be fairly clear that this will go as fast as it can, so please be patient :-)


Read and Post Comments

Autoloading in python

June 21, 2009 at 04:14 PM | categories: python, oldblog | View Comments

Before I started using python, I'd used perl for several years, and one thing which I'd liked about perl was its autoload facility. Now in python the closest equivalent that I've seen is __getattr__ for classes, but not __getattr__ for a module. This seemed like a real shame since there are times when autoload can be incredibly useful.
If it seems chaotic, consider the Unix PATH variable. Any time you type a name, the shell looks in lots of locations and runs the first one it finds. That's effectively the same sort of idea as autoloading. Yes, you can do some really nasty magic if you want, but then you can do that with the shell to, and generally people get along find.
Anyway, vaguely curious about it I decided to do some digging around, and came across this post by Leif K Brookes, which suggests this:
You could wrap it in an object, but that's a bit of a hack.

import sys

class Foo(object):
     def __init__(self, wrapped):
         self.wrapped = wrapped

     def __getattr__(self, name):
         try:
             return getattr(self.wrapped, name)
         except AttributeError:
             return 'default'

sys.modules[__name__] = Foo(sys.modules[__name__])

That looked reasonable, so I created a file mymod.py which looks like this:
import sys

def greet(greeting="Hello World"):
   print greeting

class mymod_proxy(object):
    def __init__(self):
        super(mymod_proxy, self).__init__()
        self.wrapped = sys.modules["mymod"]
    def __getattr__(self, name):
        try:
            return getattr(self.wrapped, name)
        except AttributeError:
            def f():
                greet(name)
            return f

sys.modules["mymod"] = mymod()
And tried using it like this:
~> python
Python 2.5.1 (r251:54863, Jan 10 2008, 18:01:57)
[GCC 4.2.1 (SUSE Linux)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import mymod
>>> mymod.hello()
hello
>>> from mymod import Hello_World
>>> Hello_World()
Hello_World
And as you can see, it seems to work as expected/desired.

Now the reason I'd been thinking about this, is because I'd like to retain the hierarchy of components in Kamaelia that we have at the moment (it's useful for navigating what's where), but given we tend to use them in a similar way to Unix pipelines it's natural to want to be able to do something like:
from Kamaelia import Pipeline, ConsoleReader, ConsoleWriter
Pipeline(
    ConsoleReader(),
    ConsoleWriter(),
).run()
Rather than the more verbose form specifically pulling them in from particular points. Likewise, we don't really want to import every single module in Kamaelia.py, because of the large number of components there (Kamaelia is really a toolbox IMO where things get wired together, and Axon is the tool for making new tools), the majority of which won't be used in ever application!

Now, I haven't done this yet, and wouldn't do it lightly, but the fact that you can actually make autoload functionality work, seems kinda cool, and and a nice opportunity. But I'm also now wondering just how nasty this approach seems to people. After all, Leif describes it as "a bit of a hack", and whilst it's neat, I'm not taking in the positive view. I'm interested in any views on better ways of doing Autoload in python, and also whether people view it as a nice thing at all. (One person's aesthetic is another person's pain after all...)
Read and Post Comments

Europython

June 10, 2009 at 05:35 PM | categories: python, oldblog | View Comments

 I've mentioned this in a couple of places, but mentioning on my blog seems appropriate too.

I'm giving a tutorial on Kamaelia at Europython '09 this year.

Europython details:
   Where: Birmingham UK
   When:
      Tutorial days 28/29th June.
      Main conference: 30th June - 2nd July
      Kamaelia specifically: 28th June, 9am
            http://www.europython.eu/talks/timetable/
   Cost:
       Tutorial days: £100
       Conference days: £190
   More info:
       http://www.europython.eu/
       http://www.europython.eu/talks/timetable/

Blurb for the Kamaelia tutorial:
Kamaelia: Pragmatic Concurrency

Tutorial, Half day (intermediate)

Why use concurrency? Since concurrency is viewed as an advanced topic by many developers, this question is often overlooked. However, many real world systems, including transportation, companies, electronics and Unix systems are highly concurrent and accessible by the majority of people. One motivation can be “many hands make light work” but in software this often appears to be false – in no small part due to the tools we use to create systems. Despite this, the need for concurrency often creeps into many systems.

Kamaelia is a toolset and mindset aimed at assisting in structuring your code such that you can focus on the problem you want to solve, but in a way that results in naturally reusable code that happens to be painlessly concurrent. It was designed originally to make maintenance of highly concurrent network systems simpler, but has general application in a wider variety of problem domains, including desktop applications, web backend systems (eg video transcode & SMS services), through to tools for teaching a child to read and write.

This tutorial will cover:
  • A fast overview in the style of a lightning talk.
  • Kamaelia's core – Axon – which provides the basic tools needed for concurrent systems, followed by a session on implementing your own core.
  • Practical walk throughs of real world Kamaelia systems to demonstrate how to build and design systems of your own.
  • More advanced concepts such as reusing centralised services and constrained data sharing, as well as ongoing open issues will be touched upon.
  • Tips, tricks and rules of thumb when working with concurrent systems.
During this highly practical tutorial, where you will create your own version of Axon, your own components and first Kamaelia based system (bring a laptop!). The course expects a level of familiarity with Python but no prior experience of concurrency is assumed.
The structure of this in terms of time is 2 x 1.5 hour sessions, with a 15 minute break in the middle, so hopefully enough time to impart enough useful knowledge to help you get started with Kamaelia.

Also, if Kamaelia isn't interesting to you (sob :),  Ali Afshar who hangs out on Kamaelia's IRC channel is also giving a tutorial there on PyGTK, along with lots of other people giving interesting tutorials and talks :-)
Read and Post Comments

New Tool for helping debugging Kamaelia Systems

June 08, 2009 at 06:17 PM | categories: python, oldblog | View Comments

I've added a PythonInterpreter component to Kamaelia in Kamaelia.Experimental.PythonInterpreter. The primary use for this is intended to assist with debugging, but there are other possible uses.

Use in a system

This allows a variety of things, from a basic command line console:
StandaloneInterpreter().run()
Which you can run in the background on any console. For example you could do
this:
    ServerCore(protocol=myprotocol, port=1234).activate()
    StandaloneInterpreter().run()
Alternatively, you can use an embeddable component that speaks to inbox/outbox rather than stdin/stdout. Crudely you can do something like this:
    Pipeline(
        ConsoleReader(),
        InterpreterTransformer(),
        ConsoleEchoer(),
    ).run()
But you can also put it inside a pygame application, reading & writing from a
Textbox/TextDisplayer:
    Pipeline(
        Textbox(size = (800, 300), position = (100,380)),
        InterpreterTransformer(),
        TextDisplayer(size = (800, 300), position = (100,40)),
    ).run()

This looks like this:


The interesting option this opens up of course is the fact that you can add in a networked interpreter (note: this is hideously insecure incidentally - so only safe of a network you completely control!) into any application:
    from Kamaelia.Chassis.Pipeline import Pipeline
    from Kamaelia.Chassis.ConnectedServer import ServerCore
    from Kamaelia.Util.PureTransformer import PureTransformer
   
    from Kamaelia.Experimental.PythonInterpreter import InterpreterTransformer
   
    def NetInterpreter(*args, **argv):
        return Pipeline(
                    PureTransformer(lambda x: str(x).rstrip()),
                    InterpreterTransformer(),
                    PureTransformer(lambda x: str(x)+"\r\n>>> "),
               )
   
    ServerCore(protocol=NetInterpreter, port=1236).run()

In one console:
    # ./ServerBasedPythonInterpreter.py
In another:
    ~> telnet 127.0.0.1 1236
    Trying 127.0.0.1...
    Connected to 127.0.0.1.
    Escape character is '^]'.
   
    >>> self
    Component
Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer_21 [ inboxes :
{'control': [], 'inbox': []} outboxes : {'outbox': [], 'signal': []}
    >>> "hello world"
    hello world
    >>> Axon
    <module 'Axon'
from '/usr/local/lib/python2.5/site-packages/Axon/__init__.pyc'>
    >>> dir(self)
    ['Inbox', 'Inboxes', 'Outboxes', 'Usescomponents', '_AxonObject_...
    >>> self.scheduler
    Axon.Scheduler.scheduler_1 :1 :1 :0 :
    >>> self.scheduler.threads
{<Kamaelia.Internet.TCPServer.TCPServer object at 0xb7b972cc>: <object object
at 0xb7c92468>,
<Kamaelia.Internet.ConnectedSocketAdapter.ConnectedSocketAdapter object at
0xb7ba4acc>: <object object at 0xb7c92468>,
<Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer object at
0xb7ba6bec>: <object object at 0xb7c92468>,
<Kamaelia.Chassis.Pipeline.Pipeline object at 0xb7bafc2c>: <object object at
0xb7c92468>, <Kamaelia.Experimental.PythonInterpreter.InterpreterTransformer
object at 0xb7b9dc4c>: <object object at 0xb7c92468>,
<Kamaelia.Util.PureTransformer.PureTransformer object at 0xb7baf86c>: <object
object at 0xb7c92460>, <Kamaelia.Chassis.ConnectedServer.ServerCore object at
0xb7b8acec>: <object object at 0xb7c92468>,
<Kamaelia.Chassis.Pipeline.Pipeline object at 0xb7ba1d0c>: <object object at
0xb7c92468>, <Kamaelia.Util.PureTransformer.PureTransformer object at
0xb7ba192c>: <object object at 0xb7c92468>,
<Kamaelia.Util.PureTransformer.PureTransformer object at 0xb7b9d96c>: <object
object at 0xb7c92468>, <Kamaelia.Internet.Selector.Selector object at
0xb7b979cc>: <object object at 0xb7c92468>}

etc.

Rummaging Around inside Running Systems

The idea behind this component really is to assist in debugging live running systems by directly rummaging around inside them, which is why this form is probably most useful:
    if interactive_debug:
        StandaloneInterpreter().activate()
   
    MyMainApplication().run()
And whilst the other forms naturally drop out as useful, this form is probably the safest of the bunch!

This form also is slightly more flexible, in that it allows this sort of thing:
    ~/> ./BasicPythonInterpreter.py
    > from Kamaelia.UI.Pygame.Ticker import Ticker

    ok
    > X=Ticker()

    ok
    Component Kamaelia.UI.Pygame.Ticker.Ticker_7 [ inboxes : {'control':
[], '_displaycontrol': [], 'unpausebox': [], 'pausebox': [], 'inbox':
[], 'alphacontrol': []} outboxes : {'outbox': <>, 'signal':
<>, '_displaysignal': <>}
> X.activate()

    ok
    Component Kamaelia.UI.Pygame.Ticker.Ticker_7 [ inboxes : {'control':
[], '_displaycontrol': [], 'unpausebox': [], 'pausebox': [], 'inbox':
[''], 'alphacontrol': []} outboxes : {'outbox': <>, 'signal':
<>, '_displaysignal': <>}
> self.link((self, "outbox"), (X,"inbox"))

    ok
    Link( source:
[Kamaelia.Experimental.PythonInterpreter.StandaloneInterpreter_5,outbox],
sink:[Kamaelia.UI.Pygame.Ticker.Ticker_7,inbox] )
> self.send("Hello", "outbox")

    ok

As you can guess this then results the text displayer outputting to the display.

As another example, if you're rummaging inside a system, you could start up an introspector in another console:
   ./AxonVisualiser.py --port 1500
And then in the enbedded interpreter do this:
> from Kamaelia.Util.Introspector import Introspector

ok
> from Kamaelia.Chassis.Pipeline import Pipeline

ok
> from Kamaelia.Internet.TCPClient import TCPClient

ok
> Pipeline(
    Introspector(),
    TCPClient("127.0.0.1", 1500),
).activate()
> > >
ok
And that then adds an Axon visualisation/introspector to a running system - which looks like this:



Clearly this can aid in identifying when something has gone wrong.

Hope others find it useful too :-)

Read and Post Comments

Going from generator coroutines to Kamaelia Components

May 17, 2009 at 02:09 AM | categories: python, oldblog | View Comments

Earlier this evening an announcement by Pete Fein regarding the formation of a python-concurrency mailing list (aka Python Concurrency Special Interest Group) bounced past my inbox in the unladen-swallow mailing list. Naturally, given my work on Axon (a concurrency library) and Kamaelia (a bunch of components that use it), it jumped out at me as interesting. (5 minute overview for those that don't know what Kamaelia is...)

Pete then posted to the list a small collection of different ways of implementing...
#!/bin/sh
tail -f /var/log/system.log |grep pants
... in python.

This struck me as a nice example worth comparing to Kamaelia - specifically to compare the generator version. Indeed the Kamaelia version is fairly directly derived from the generator version, which is why I'm picking on it. The full generator version looks like this:
import time
import re

def follow(fname):
    f = file(fname)
    f.seek(0,2) # go to the end
    while True:
        l = f.readline()
        if not l: # no data
            time.sleep(.1)
        else:
            yield l

def grep(lines, pattern):
    regex = re.compile(pattern)
    for l in lines:
        if regex.match(l):
            yield l

def printer(lines):
    for l in lines:
        print l.strip()

f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)

for i in p:
    pass

Now, this is nice, but the core logic:
tail -f /var/log/system.log |grep pants
Has somehow become obfuscated at the end - which is a shame, because it doesn't need to be. Taking the Kamaelia version step by step, let's take that end part:
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)

for i in p:
    pass
... and turn it into something which looks like what you'd do in Kamaelia:
from Kamaelia.Chassis.Pipeline import Pipeline

Pipeline(
    Follow('tail -f /var/log/system.log'),
    Grep(".*pants.*"),
    Printer(),
).run()
(incidentally, to actually use separate processes, ala shell, you'd use ProcessPipeline, not Pipeline)

So in order for this to be valid, we now need to adapt follow into Follow, grep into Grep, and printer into Printer.

Let's start by adapting follow:
def follow(fname):
    f = file(fname)
    f.seek(0,2) # go to the end
    while True:
        l = f.readline()
        if not l: # no data
            time.sleep(.1)
        else:
            yield l
First of all, we note that this follow source blocks - specifically calling time.sleep. Now there are other ways of doing this, but the simplest way of keeping this code structure is to just create a threaded component. We also need to capture the argument (fname) which isn't optional and has no logical default, so we need to have an __init__ method. So we grab that, store it somewhere handy, and call the super class initialiser. Pretty standard stuff.
import Axon
class Follow(Axon.ThreadedComponent.threadedcomponent):
    def __init__(self, fname, **argv):
        self.fname = fname
        super(Follow,self).__init__(**argv)
    def main(self):
        f = file(self.fname)
        f.seek(0,2) # go to the end

        while not self.dataReady("control"):
            l = f.readline()
            if not l: # no data
                time.sleep(.1)
            else:
                self.send(l, "outbox")

        f.close()

        self.send(self.recv("control"), "signal")
The main() method of this component now follows the same core logic as the follow() generator, as highlighted in green. The logic added in blue, is simply control logic, and is fairly commonly used, which looks like this:
        while not self.dataReady("control"):
            <Body of generator>
        self.send(self.recv("control"), "signal")
This allows someone to shutdown our component cleanly. Other than that, the other major difference is that this:
            yield l
Has been replaced with this:
            self.send(l, "outbox")

ie rather than expecting to be called in a "call me & handle my values" loop, it just passes it's results out an outbox called "outbox". (much like tail -f spits it's results out of stdout)

The next part to convert is grep(). Recalling this, this looks like this:
def grep(lines, pattern):
    regex = re.compile(pattern)
    for l in lines:
        if regex.match(l):
            yield l
With the Kamaelia equivalent looking like this:
class Grep(Axon.Component.component):
    # Default pattern, override in constructor with pattern="some pattern"
    # See below
    pattern = "."
    def main(self):
        regex = re.compile(self.pattern)
        while not self.dataReady("control"):
           for l in self.Inbox("inbox"):
               if regex.match(l):
                   self.send(l, "outbox")

           self.pause()
           yield 1
        self.send(self.recv("control"), "signal")

Once again the overall logic is unchanged, and again the shutdown logic added is in blue.

However, we don't need to explicitly pass in a location for lines to enter this component. Specifically we just expect to be passed lines form out standard inbox "inbox", in the same way normal grep expects data on stdin.

Also, because we have a logical default for the pattern (match everything using "."), we don't need to have an __init__ method. The reason for this is because the baseclass for all components does this in its __init__ method:
   def __init__(self, *args, **argd):
..
      self.__dict__.update(argd)

So calling Grep(pattern=".*pants.*") will specialise the component value of pattern (The reason for this is it actively enables monkey-patching as reconfiguration, as you can see in this greylisting mail server).

Again Grep is actually a special case of a filter, and as a result really just wants to take data from its std inbox and pass matching stuff to its std outbox. With if you look at the core logic, is precisely what it does:
           for l in self.Inbox("inbox"):
               if regex.match(l):
                   self.send(l, "outbox")


After clearing its inbox, it pauses, and releases control to the scheduler (Pausing is a component's way of letting the scheduler know "don't call me unless there's some data for me to work on").

Finally, we need to implement the Printer() component. Again we can adapt this generator based version, thus:
def printer(lines):
    for l in lines:
        print l.strip()
Which transforms into this:
class Printer(Axon.Component.component):
    def main(self):
        while not self.dataReady("control"):
            for l in self.Inbox("inbox"):
                print l.strip()

            self.pause()
            yield 1
        self.send(self.recv("control"), "signal")

Again the core logic is the same (green), and control logic is blue. Again, rather than needing the explicit "lines" iterator, we have a standard place for data to come into the component - the inbox "inbox" - leaving the rest of the logic essentially unchanged.
Finally, in the same way the generator version puts the whole thing together:
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)

for i in p:
    pass
We can put the Kamaelia components together:
Pipeline(
    Follow('tail -f /var/log/system.log'),
    Grep(".*pants.*"),
    Printer(),
).run()
This is very different. The generator version creates a chain of iterators, passing them in as the first parameter to the next one in the chain, with the last item in the chain (the for loop) being the one that pulls things along. The Kamaelia version instantiates a Pipeline component which is very clearly piping the outputs from one component into the inputs of the following one.

In fact the Pipeline component creates linkages between the components in the pipeline. These are conceptually similar to stackless's channels (though inspired by unix pipes and occam). Implementation whise, a linkage actually collapses an inbox into an outbox. As a result, when Follow sends a message to it's outbox "outbox", that message is instantly delivered into the inbox "inbox" of Grep. Inboxes for generator based components are just plain old lists, and for threaded components they're threadsafe queues.

The nice thing about this, is that if you wanted to use this in a network server, you could do this:

from Kamaelia.Chassis.ConnectedServer import ServerCore

def FollowPantsInMyLogs(*args):
    return Pipeline(
             Follow('tail -f /var/log/system.log'),
             Grep(".*pants.*"),
           )

ServerCore(protocol=FollowPantsInMyLogs, port=8000).run()
Though that opens a Follow & Grep component for every client. To use just one Follow client, you could just follow and publish the data for all clients:

from Kamaelia.Util.Backplane import *

Backplane("PANTSINMYLOGS").activate()

Pipeline(
      Follow('tail -f /var/log/system.log'),
      Grep(".*pants.*"),
      PublishTo("PANTSINMYLOGS"),
).activate()

def clientProtocol(*args):
    return SubscribeTo("PANTSINMYLOGS")

ServerCore(protocol=clientProtocol, port=8000).run()

You could also monitor this on the server console by adding in a local printer:
Backplane("PANTSINMYLOGS").activate()

Pipeline(
      Follow('tail -f /var/log/system.log'),
      Grep(".*pants.*"),
      PublishTo("PANTSINMYLOGS"),
).activate()

Pipeline(
      SubscribeTo("PANTSINMYLOGS"),
      Printer(),
).activate()

def clientProtocol(*args):
    return SubscribeTo("PANTSINMYLOGS")

ServerCore(protocol=clientProtocol, port=8000).run()
... which all fairly naturally describes the higher level co-ordination going on. Now you can do all this from the ground up using plain old generators, but personally I find this approach easier to follow. Some people find others simple :)
Anyway, for the curious reader, I hope this is a useful/interesting comparison of a non-kamaelia based approach with a kamaelia based approach, with a visible demonstration at the end of why I prefer the latter :) ... and all this ignores graphlines too :)

Read and Post Comments

Bar Camp Leeds UK

May 05, 2009 at 09:56 PM | categories: python, oldblog | View Comments

Bar Camp Leeds (UK) is running for its 3rd year running. It'd be great to see other Python people there if you're around. Details:

Read and Post Comments

Schedulers matter

March 04, 2009 at 10:26 AM | categories: python, oldblog | View Comments

Improving the scheduler. It's been something we've avoided for quite a while in Kamaelia, but Simon Wittber's recent post benchmarking Fibra vs Kamaelia vs stackless is interesting. His key showing that "Stackless is 7x faster than Fibra, and Fibra is 10x faster than Kamaelia" are cool for him :-) ... and naturally led me to think "why". Looking at the code, it struck me that he's doing something more interesting with the scheduler given these code forms:
scheduler.install(self.messageLoop())
# self.MessageLoop is a regular python generator
...
            yield self.incrementCounter()
            yield kickTo.messageQueue.push(self)
If you delve inside the fibra scheduler (which doesn't appear to be here unexpectedly) you see the following core:
    def tick(self):
        """Iterates the scheduler, running all tasks and calling all
        handlers.
        """
        active = False
        for handler in self.handlers:
            active = handler.is_waiting() or active
        active = (len(self.tasks) > 0) or active
        tasks = []
        while True:
            try:
                task, send_value = self.tasks.pop(0)
            except IndexError, e:
                break
            try:
                if isinstance(send_value, Exception):
                    r = task.throw(send_value)
                else:
                    r = task.send(send_value)
            except StopIteration, e:
                r = e
            if r is None:
                tasks.append((task, None))
            else:
                try:
                    handler = self.type_handlers[type(r)]
                except KeyError:
                    raise RuntimeError("Don't know what to do with yielded type: %s" % type(r))
                handler.handle(r, task)
        self.tasks[:] = tasks
        return active
The core of this appears to be "when I'm done, do this later" next. If you think that's familiar, it should be - its very similar (at least conceptually) to what twisted does with deferreds. It's not identical, but similar. So what does this mean for the hacksack demo? Well, if we look at self.tasks after each run, by changing:
    def run(self):
        while self.tick():
            pass
to:
    def run(self):
        while self.tick():
            print "TASKS", [x[0] for x in self.tasks]
It becomes apparent what's happening (though it's fairly obvious from above):
TASKS [<generator object at 0xb7b04fcc>]
TASKS []
TASKS [<generator object at 0xb780f94c>]
TASKS []
TASKS [<generator object at 0xb7a1b04c>]
TASKS []
TASKS [<generator object at 0xb776fcac>]
TASKS []
TASKS [<generator object at 0xb79327ac>]
TASKS []
TASKS [<generator object at 0xb7b0ff2c>]
TASKS []
Fundamentally, the reason it's quicker for two reasons:
  • It always knows which generator is ready to run next.
  • It also defaults to pausing the generator, unless it specifically asks to be run. ie the tasks default to being passive.
  • The sender knows who the receiver is. This allows the sender to schedule the receiver explicitly.
These two points matter because Axon's scheduler (from jesse's post, hopefully people are aware that Axon is the relevant part of kamaelia here):
  • Is effectively a round robin scheduler - essentially for simplicity. The key question we wanted to ask was "does a component model based around inboxes/outboxes make it easier to make easier to write/reuse concurrent software", rather than "how can we build a faster, more responsive scheduler". As a result the round robin scheduler was always a compromise. There's a little bit of intelligence in there, but not much.
  • Kamaelia's components default to active, rather than passive. That is they're expected to run continuously unless explicitly paused. This design decision impacts the scheduler as a whole.
  • The sender does not know who the receiver is. This requires something different to happen in the scheduler. On the upside, it means that code can be tested easier in isolation, or reused in situations it wasn't originally expected to be used within.
This leads me to the rather obvious solution here - can we rebuild a better Axon scheduler by reusing fibra and throw away our scheduler? If so that would be really neat - throwing away our scheduler for something faster and more intelligent would be rather fun :) If we can't then we've been pondering improving it - that is making it more intelligent - for a while. Fibra's benchmarks suggest that doing so would be well worth it. The question this raises though is whether doing this would help us in the general case. At present I'm unclear on that, but until you try, you don't know.

Beyond all that though, fibra looks neat :)
Read and Post Comments

The 5 minute (version), not the full half hour...

February 26, 2009 at 11:15 PM | categories: python, oldblog | View Comments

As you may know Kamaelia is a project I'm rather familar with (ok understatement), but one of the key ideas behind it is about how to make concurrency usable for everyday problems (from greylisting, database modelling assistance, though to learning to read & write). No one concurrency method suits everyone - as Jesse's recent series of posts shows (if you've not read them they're well worth it :), but Kamaelia's approach fits my head, and hopefully yours too. There may be a few areas where nicer syntactic sugar may be appropriate and so on, and feedback is always welcome - preferably to the project's google group/mailing list.

OK, the point of this post is that recently at O'Reilly Ignite UK North I gave a 5 minute talk which was essentially "what we've learnt about practical concurrency", and by necessity of time it's a whistle stop tour. The slides are on slideshare, and the video is on blip.tv - along with the rest of the talks - and by the magic of the internet they're below as well.

Embracing Concurrency (slides)

Embracing Concurrency (video)

Many thanks to Imran Ali of Carbon Imagineering and Craig Smith of O'Reilly GMT for organising a great evening of interesting talks!

Read and Post Comments

« Previous Page -- Next Page »