Readable concurrency in Python

March 16, 2014 at 05:30 PM | categories: python, actors, concurrency, kamaelia | View Comments

Last week there were a couple of interesting posts by Glyph Lefkowitz and Rob Miller on concurrency. Both are well worth a read. One of the examples present by Glyph is the canonical concurrent update problem. This essentially happens when an update takes multiple streps and can be interfered with. Rob's post essentially presents a solution in Go.

The core of this is that unconstrained shared mutable state in a concurrent situation is a bad idea. This is something that I've spoken about in the past with regard to Kamaelia. In fact, in Kamaelia, there were two ways of handling this. One was to essentially funnel all requests for updating values through a "cashier" component. The other was to use software transactional memory. Kamaelia provides tools for both approaches.

Guild also provides tools for both approaches. The key tool for the cashier approach is guild.actor. The key tool for the STM approach, is guild.stm.

Given Glyph's and Rob's posts had touched on ideas I've spoken about in the past, I thought it might be nice to work through the example in Guild. Initially we'll model an account as an actor, and test it with some basic non-threaded code. Then we'll test it with 3 actors randomly withdrawing cash and 1 more randomly adding cash. Finally, we'll show what happens when 2 actors both have accounts and are randomly transferring money from each others accounts. (Interestingly this last one kinda makes certain ideas of banking clearer to me :-)

Because they were the first names that sprang to mind, this post uses the names from characters from the Flintstones. I blame Red Dwarf.

Basic Account Actor

Source: guild/examples/blog/account-1.py

So, first of all the account actor using Guild. As before, first of all the code, then a discussion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from guild.actor import *

class InsufficientFunds(ActorException):
    pass

class Account(Actor):
    def __init__(self, balance=10):
        super(Account, self).__init__()
        self.balance = balance

    @actor_function
    def deposit(self, amount):
        # I've made this a function to allow the value to be confirmed deposited
        print "DEPOSIT", "\t", amount, "\t", self.balance
        self.balance = self.balance + amount
        return self.balance

    @actor_function
    def withdraw(self, amount):
        if self.balance < amount:
            raise InsufficientFunds("Insufficient Funds in your account",
                                    requested=amount, balance=self.balance)
        self.balance = self.balance - amount
        print "WITHDRAW", "\t", amount, "\t", self.balance
        return amount

This should be pretty readable.

First of all the logic of what's happening:

  • We define an exception InsufficientFunds to raise when someone tries to withdraw more money than the account contains

  • We define a subclass of actor - Account. Since we need to initialise it with a balance, we must call the superclass initialiser at line 8. Our Account objects have 2 operations that they can perform: deposit and withdraw.

  • withdraw checks that sufficient funds are available. If they are, then the funds are returned as a result, after updating the balance and logging the result. If they are not, an InsufficientFunds exception is raised, which the caller thread will have to deal with.

  • deposit takes the amount of funds, updates the balance, logs the results and returns the new balance to the caller thread.

What's happening in terms of mechanics? (links below take you to the code in github)

  • withdraw is an actor_function. What does this mean? It means that the caller calls the method. In the caller thread, this places the message ((withdraw, self, amount),resultQueue) onto an inbound queue to the actor. The caller thread then waits for a response. The actor receives the message, does the work, and posts the results back down the result queue. The caller retrieves this, and returns the result to the caller. If there was an exception thrown within the actor, this is re-raise inside caller thread. As a result our withdraw function can look pretty normal. If there's insufficient funds, the caller gets an exception to deal with. If there are sufficient funds, the balance is updated, a message is logged to the console, and the amount of money is returned to the caller.

  • deposit is also an actor_function. It doesn't need to be because depositing money always succeeds, however it's nice for deposit to return the updated balance to the caller. (If the caller doesn't care, this could be an actor_method instead)

Single threaded Account user

Source: guild/examples/blog/account-1.py

So, let's use this. In the main thread we'll create the account and start it. We'll then define 3 account users who always only withdraw funds - Fred, Barney and Wilma. In our simulation Betty is the person earning money - but she's not mentioned here.

Anyway, in each iteration through this loop, Betty earns 100, and Fred, Barney and Wilma each randomly pick an amount between 10 and 160. This continues over and over until someone tries to take more money than is in the account. We then report the amount grabbed, stop the account and exit. The code looks like this:

account = Account(1000).go()

fred, barney, wilma = 0,0,0
try:
    while True:
        account.deposit(100)
        fred += account.withdraw(random.choice([10,20,40,80,160]))
        barney += account.withdraw(random.choice([10,20,40,80,160]))
        wilma += account.withdraw(random.choice([10,20,40,80,160]))
except InsufficientFunds as e:
    print e.message
    print "Balance", e.balance
    print "Requested", e.requested
    print account.balance


print "GAME OVER"

print "Fred grabbed", fred
print "Wilma grabbed", wilma
print "Barney grabbed", barney

account.stop()
account.join()

There's not an awful lot to discuss with this. This logic should be pretty clear. The fact that the account is in a different thread isn't particularly interesting - but it shows the basic logic of depositing/withdrawing funds. What does the output look like?

DEPOSIT         100     1000
WITHDRAW        40      1060
WITHDRAW        40      1020
WITHDRAW        40      980
DEPOSIT         100     980
       ... snip ...
WITHDRAW        160     310
WITHDRAW        160     150
DEPOSIT         100     150
WITHDRAW        160     90
Insufficient Funds in your account
Balance 90
Requested 160
90
GAME OVER
Fred grabbed 800
Wilma grabbed 610
Barney grabbed 800

Multiple threads access the Account

Source: guild/examples/blog/account-2.py

In this example, we create two new actors - MoneyDrain and MoneySource.

  • MoneyDrain - This sits there and repeatedly tries to withdraw random amounts of funds from the Account, and keeps track of how much money it's tried to grab. When the account has insufficient funds to withdraw from, the MoneyDrain gives up, complains and stops() itself.
  • MoneySource - This sits there, and repeatedly adds a random amount of funds to the account.

We then rewrite our simulation as follows: We still have one shared account Fred, Betty, and Barney are now all MoneyDrains on Wilma. * Wilma is the sole source of income for the group -ie a MoneySource

The system is then started, and runs until Fred, Betty and Barney have all taken as money as they can. Wilma is then stopped and the total funds reported.

# InsufficientFunds/etc as before

class MoneyDrain(Actor):
    def __init__(self, sharedaccount):
        super(MoneyDrain, self).__init__()
        self.sharedaccount = sharedaccount
        self.grabbed = 0

    @process_method
    def process(self):
        try:
            grabbed = self.sharedaccount.withdraw(random.choice([10,20,40,80,160]))
        except InsufficientFunds as e:
            print "Awww, Tapped out", e.balance, "<", e.requested
            self.stop()
            return
        self.grabbed = self.grabbed + grabbed

class MoneySource(Actor):
    def __init__(self, sharedaccount):
        super(MoneySource, self).__init__()
        self.sharedaccount = sharedaccount

    @process_method
    def process(self):
        self.sharedaccount.deposit(random.randint(1,100))

account = Account(1000).go()

fred = MoneyDrain(account).go()
barney = MoneyDrain(account).go()
betty = MoneyDrain(account).go()

wilma = MoneySource(account).go() # Wilma carries all of them.

wait_for(fred, barney, betty)
wilma.stop()
wilma.join()

account.stop()
account.join()

print "GAME OVER"

print "Fred grabbed", fred.grabbed
print "Wilma grabbed", barney.grabbed
print "Betty grabbed", betty.grabbed
print "Total grabbed", fred.grabbed + barney.grabbed + betty.grabbed
print "Since they stopped grabbing..."
print "Money left", account.balance

Things worth noting here - we've got 4 completely separate free running threads all acting on shared state (shared funds) in a 5th thread. We're able to start them all off, they operate cleanly, and at this level we can trust the behaviour of the 5 threads - due to the fact that the Accounts actor ensures that operations on the shared state are serialised into atomic operations. As a result, we can completely trust this code to operate in the manner which we expect it to.

It's also worth noting that when the withdraw method fails, the exception is thrown inside the appropriate thread. This is visible in the output below because all 3 threads have to run out access to funds for the program to exit.

What does the output from this look like?

WITHDRAW        10      990
WITHDRAW        20      970
WITHDRAW        40      930
DEPOSIT         74      930
WITHDRAW        20      984
       ... snip ...
WITHDRAW        20      47
DEPOSIT         44      47
WITHDRAW        80      11
WITHDRAW        10      1
Awww, Tapped out 1 < 80
DEPOSIT         93      1
Awww, Tapped outAwww, Tapped out 1 < 160
 1 < 40
DEPOSIT         77      94
GAME OVER
Fred grabbed 220
Wilma grabbed 510
Betty grabbed 560
Total grabbed 1290
Since they stopped grabbing...
Money left 171

Multiple threads transferring funds between multiple Accounts

Source: guild/examples/blog/account-3.py

This final example is a bit of fun, but also explicitly shows how to implement a function for transferring funds. Before the main example, let's look at the transfer function:

def transfer(amount, payer, payee):
    funds = payer.withdraw(amount)
    payee.deposit(funds)

This looks deceptively simple. In practice, what happens is someone calls the function with 2 accounts. The appropriate funds are withdrawn from one account and deposited in the other. This is guaranteed to be thread safe due to this translating to the following operations:

  • caller: Create a ResultQueue
  • caller: create message ((withdraw, payer, amount), ResultQueue) and place on payer's F_inbound queue.
  • caller: wait for message on ResultQueue
  • payer: receive message from F_inbound queue
  • payer: perform contents of method withdraw(self, amount) - put result in "result"
  • payer: if an exception is raised put (exception, None) on ResultQueue
  • payer: if an exception is not raise put (0, result) on ResultQueue
  • caller: if result is (exception, None), rethrow exception
  • caller: if result is (0, result), then the result is stored in "funds"
  • caller - create message ((deposit, payee, funds), ResultQueue) and place on payee's F_inbound queue.
  • caller: wait for message on ResultQueue
  • payee: receive message from F_inbound queue
  • payee: perform contents of method deposit(self, amount) - put result in "result"
  • payee: if an exception is raised put (exception, None) on ResultQueue
  • payee: if an exception is not raise put (0, result) on ResultQueue
  • caller: if result is (exception, None), rethrow exception
  • caller: if result is (0, result), then the result is discarded, and the function exits

This then allows us to create a MischiefMaker. Our MischiefMaker will be given two accounts - their own and a friends. They will then repeatedly transfer random amounts of funds out of their friends account. They'll also keep track of how much money they've grabbed from their friend.

An example of tracing the logic here might be this:

  • Barney/Fred balances: 1000,1000
  • Barney grabs 250, Freb grabs 250 - Barney/Fred balances: 1000,1000
  • Barney grabs 250, Freb grabs 250 - Barney/Fred balances: 1000,1000
  • Barney grabs 250, Freb grabs 250 - Barney/Fred balances: 1000,1000
  • Barney grabs 250, Freb grabs 250 - Barney/Fred balances: 1000,1000
  • Barney grabs 250, Freb grabs 250 - Barney/Fred balances: 1000,1000
  • Barney grabs 500, Freb grabs 250 - Barney/Fred balances: 1250,750
  • Barney grabs 500, Freb grabs 250 - Barney/Fred balances: 1500,500
  • Barney grabs 500, Freb grabs 250 - Barney/Fred balances: 1750,250
  • Barney grabs 500, Freb grabs 250 - FAILS, Barney gives up. Fred then continues.

The upshot here is that both Fred and Barney are grabbing what they think is alot more than 1000 each, even though there's only 2000 in circulation. This seems a bit counter intuitive, but when you consider the banking system does essentially operate this way - just with more actors - it makes more sense.

So the MischiefMaker code looks like this:

class MischiefMaker(Actor):
    def __init__(self, myaccount, friendsaccount):
        super(MischiefMaker,self).__init__()
        self.myaccount = myaccount
        self.friendsaccount = friendsaccount
        self.grabbed = 0

    @process_method
    def process(self):
        try:
            grab = random.randint(1,10)*10
            transfer(grab, self.friendsaccount, self.myaccount)
        except InsufficientFunds as e:
            print "Awww, Tapped out", e.balance, "<", e.requested
            self.stop()
            return
        self.grabbed = self.grabbed + grab

As before, this should be fairly clear. We keep track of accounts, and transfers occur bidirectionally as quickly as possible.

account1 = Account(1000).go()
account2 = Account(1000).go()

fred = MischiefMaker(account1, account2).go()
barney = MischiefMaker(account2, account1).go()


wait_for(fred, barney)

account1.stop()
account2.stop()
account1.join()
account2.join()

print "GAME OVER"

print "Fred grabbed", fred.grabbed
print "Barney grabbed", barney.grabbed
print "Total grabbed", fred.grabbed + barney.grabbed
print "Since they stopped grabbing..."
print "Money left", account1.balance, account2.balance

When we run this, all 4 threads are free running. Fred grabs money, Barney grabs money, and the fact withdraw and deposit are actor_functions ensures that the values in each account are valid at all points in time. The upshot of this is that when the simulation ends, we started with a total of 2000 and we finished with a total of 2000. Snipping the now substantial output somewhat:

INITIAL         1000
INITIAL         1000
WITHDRAW        90      910
WITHDRAW        50      950
DEPOSIT         90      910
DEPOSIT         50      950
WITHDRAW        20      980
WITHDRAW        10      990
DEPOSIT         20      980
DEPOSIT         10      990
WITHDRAW        100     900
WITHDRAW        90      910
DEPOSIT         100     910
DEPOSIT         90      900
        ... snip ...
DEPOSIT         50      850
WITHDRAW        90      810
DEPOSIT         90      1100
WITHDRAW        30      780
        ... snip ...
WITHDRAW        10      100
DEPOSIT         10      1890
WITHDRAW        30      70
DEPOSIT         30      1900
WITHDRAW        20      50
DEPOSIT         20      1930
Awww, Tapped out 50 < 100
GAME OVER
Fred grabbed 27560
Barney grabbed 28350
Total grabbed 55910
Since they stopped grabbing...
Money left 50 1950
Ending money 2000

The thing I like about this example incidentally is that it shows Fred and Barney having very large logical incomes from each other, whereas in reality there was a fixed amount of cash. (Essentially this means Fred and Barney are borrowing from each other, much like banks do)

Conclusion

Not only can concurrency be dealt with sanely - as per Rob's point, it can also look nice, and be developer friendly. If you extend the actor model to include actor_functions, complex problems like concurrent update can become clear to work with.

In a later post I'll go into the internals of how this is implemented, but the description of how the transfer method operates should make it clearer that essentially each actor serialises actions upon it, ensuring that actor state can only be updated by one thread at a time.

Links to the three examples:

If you find this interesting, perhaps give it a try at some point. I personally find it a more practical approach - especially when dealing with things that are naturally concurrent.

Comments welcome.

blog comments powered by Disqus