The first Truly Concurrent Kamaelia Component

November 25, 2007 at 10:20 PM | categories: python, oldblog | View Comments

OK, this is using a massively simplified version of the primitives needed for concurrency in Kamaelia, but the following is the first component that will happily run completely in parallel with the rest of the system.
class FirstProcessBasedComponent(SimplestProcessComponent):
    def main(self):
        while 1:
            yield 1
            time.sleep(0.3)
            if self.dataReady():
                print time.time(),"main : RECEIVE FROM CHANNEL", self.recv()
            else:
                print time.time(),"main: CHANNEL NOT READY"


As you can see this is pretty much identical to the traditional Kamaelia model. Indeed, change the baseclass & you get a single threaded component, though you'd probably want to change the time.sleep behaviour.

The advantage here of course, is that, given a bit more work, that we should be able to take the entirety of Kamaelia's component set and simply parallelise it where it makes sense. The most obvious way being as a specialised Chassis. (other chasses are Pipeline, Graphline, Carousel(which is a bit brain numbing :) )).

Full code, which is currently in /Sketches, looks like this:
import pprocess
import time

class SimplestProcessComponent(object):
    def __init__(self):
        self.exchange = pprocess.Exchange()
        self.channel = None
        self.inbound = []

    def activate(self):
        channel = pprocess.start(self.run, None, None, named1=None, named2=None)
        exchange = pprocess.Exchange()
        exchange.add(channel)
        return exchange

    def run(self, channel, arg1, arg2, named1=None, named2=None):
        self.exchange.add(channel)
        self.channel = channel
        for i in self.main():
            pass

    def dataReady(self):
        return self.exchange.ready(timeout=0)

    def recv(self):
        if self.dataReady():
            for ch in self.exchange.ready(timeout=0):
                D = ch.receive()
                self.inbound.append(D)
        return self.inbound.pop(0)

    def main(self):
        yield 1

class FirstProcessBasedComponent(SimplestProcessComponent):
    def main(self):
        while 1:
            yield 1
            time.sleep(0.3)
            if self.dataReady():
                print time.time(),"main : RECEIVE FROM CHANNEL", self.recv()
            else:
                print time.time(),"main: CHANNEL NOT READY"

exchange = FirstProcessBasedComponent().activate()

while 1:
    time.sleep(0.7)
    print time.time(),"__main__ : SENDING TO CHANNEL"
    if exchange.ready(timeout=0):
        for ch in exchange.ready():
            ch.send({"hello":"X"})
Personally, I find this idea of true, but simple concurrency really quite a nice, fun idea :-)
blog comments powered by Disqus