Walk through of how to build a simple protocol using Kamaelia

October 07, 2008 at 02:23 AM | categories: python, oldblog | View Comments

A question was raised on the Kamaelia list asking for a demo on how to nest protocols using Kamaelia. In order to show some clarity, and avoid a protocol mismatch I decided to build a protocol which is a composite of the following 3 things:
  • Something clumping data into messages
  • JSON data being forwarded in serialised form as those messages
  • The serialised messages encrypted (naively) using DES encryption.
Rather than repost all the steps leading to that here, you can find the post that does just that here:
You'd probably want to take the resulting code a little further than I've taken it if you were to use it in production, since (for example) DataChunker is designed to be used with larger & larger numbers of messages. It would be relatively simple to transform this to a pub/sub system/router for sending recieving Javascript fragments, but wanted to leave the example looking relatively simple for now. I may come back to that as a pub/sub demo.

The final example looks like this:
 
#!/usr/bin/python

import cjson
import Axon

from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.Util.Chooser import Chooser
from Kamaelia.Util.Console import ConsoleEchoer

from Kamaelia.Internet.TCPClient import TCPClient
from Kamaelia.Chassis.ConnectedServer import ServerCore

from Kamaelia.Protocol.Framing import DataChunker, DataDeChunker
from Kamaelia.Apps.Grey.PeriodicWakeup import PeriodicWakeup

from Crypto.Cipher import DES

messages = [ {"hello": "world" },
{"hello": [1,2,3] },
{"world": [1,2,3] },
{"world": {"game":"over"} },
]*10

class MarshallJSON(Axon.Component.component):
def main(self):
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_encoded = cjson.encode(j)
self.send(j_encoded, "outbox")
if not self.anyReady():
self.pause()
yield 1

class DeMarshallJSON(Axon.Component.component):
def main(self):
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_decoded = cjson.decode(j)
self.send(j_decoded, "outbox")
if not self.anyReady():
self.pause()
yield 1

class Encoder(object):
"""Null encoder/base encoder - returns the same string
for encode/decode"""
def __init__(self, key, **argd):
super(Encoder, self).__init__(**argd)
self.__dict__.update(argd)
self.key = key
def encode(self, some_string):
return some_string
def decode(self, some_string):
return some_string

class DES_CRYPT(Encoder):
def __init__(self, key, **argd):
super(DES_CRYPT, self).__init__(key, **argd)
self.key = self.pad_eight(key)[:8]
self.obj = obj=DES.new(self.key, DES.MODE_ECB)

def encode(self, some_string):
padded = self.pad_eight(some_string)
encrypted = self.obj.encrypt(padded)
return encrypted

def decode(self, some_string):
padded = self.obj.decrypt(some_string)
decoded = self.unpad(padded)
return decoded

def pad_eight(self, some_string):
X = len(some_string)
if X % 8 != 0:
pad_needed = 8-X % 8
else:
pad_needed = 8
pad_needed = 8-(X % 8)
PAD = pad_needed * chr(pad_needed)
return some_string+PAD

def unpad(self, some_string):
x = ord(some_string[-1])
return some_string[:-x]

class Encrypter(Axon.Component.component):
key = "ABCD"
def main(self):
crypter = DES_CRYPT(self.key)
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_encoded = crypter.encode(j)
self.send(j_encoded, "outbox")
if not self.anyReady():
self.pause()
yield 1

class Decrypter(Axon.Component.component):
key = "ABCD"
def main(self):
crypter = DES_CRYPT(self.key)
while not self.dataReady("control"):
for j in self.Inbox("inbox"):
j_decoded = crypter.decode(j)
self.send(j_decoded, "outbox")
if not self.anyReady():
self.pause()
yield 1

def protocol(*args,**argd):
return Pipeline(
PeriodicWakeup(message="NEXT", interval=1),
Chooser(messages),
MarshallJSON(),
Encrypter(), # Encrypt on the way out
DataChunker(),
)

def json_client_prefab(ip, port):
return Pipeline(
TCPClient(ip, port=port),
DataDeChunker(),
Decrypter(), # Decrypt on the way in
DeMarshallJSON(),
ConsoleEchoer(use_repr=True)
)

ServerCore(protocol=protocol, port=2345).activate()
json_client_prefab("127.0.0.1", 2345).run()
blog comments powered by Disqus