Europython Videos Transcoding

July 07, 2009 at 01:02 AM | categories: python, oldblog | View Comments

Since I've had a few questions about this, a short status update. At Europython last week I was recording all the talks I was attending. Including the lightning talks this means I have video from 55 talks. The video files from the camera are too large for blip.tv, so I'm transcoding them down to a smaller size, before uploading them. Since these 55 talks are spread over nearly 80 files, that naturally takes time.

Fortunately/obviously, I'm automating this, and it'll come as no shock to some that I'm automating it using kamaelia. This automation needs to to be stoppable, since I need to only do this overnight, for practicality reasons.

Anyway, for those curious, this is the code I'm using to do the transcode & upload. You'll note that it saturates my CPU, keeping both cores busy. Also, it's interleaving an IO bound process (ftp upload) with CPU bound - transcode.

import os
import re
import Axon

from Kamaelia.Chassis.Graphline import Graphline
from Kamaelia.Chassis.Pipeline import Pipeline

class Find(Axon.Component.component):
    path = "."
    walktype = "a"
    act_like_find = True
    def find(self, path = ".", walktype="a"):
        if walktype == "a":
            addfiles = True
            adddirs = True
        elif walktype == "f":
            addfiles = True
            adddirs = False
        elif walktype == "d":
            adddirs = True
            addfiles = False

        deque = []
        deque.insert(0,  (os.path.join(path,x) for x in os.listdir(path)) )
        while len(deque)>0:
            try:
                fullentry = deque[0].next()
                if os.path.isdir(fullentry):
                    if adddirs:
                        yield fullentry
                    try:
                        X= [os.path.join(fullentry,x) for x in os.listdir(fullentry)]
                        deque.insert(0, iter(X))
                    except OSError:
                        if not self.act_like_find:
                            raise
                elif os.path.isfile(fullentry):
                    if addfiles:
                        yield fullentry
            except StopIteration:

                deque.pop(0)

    def main(self):
        gotShutdown = False
        for e in self.find(path = self.path, walktype=self.walktype):
            self.send(e, "outbox")
            yield 1
            if self.dataReady("control"):
                gotShutdown = True
                break

        if not gotShutdown:
            self.send(Axon.Ipc.producerFinished(), "signal")
        else:
            self.send(self.recv("control"), "signal")

class Sort(Axon.Component.component):
    def main(self):
        dataset = []
        while 1:
            for i in self.Inbox("inbox"):
                dataset.append(i)
            if self.dataReady("control"):
                break
            self.pause()
            yield 1
        dataset.sort()
        for i in dataset:
            self.send(i, "outbox")
            yield 1
        self.send(self.recv("control"), "signal")

class Grep(Axon.Component.component):
    pattern = "."
    invert = False
    def main(self):
        match = re.compile(self.pattern)
        while 1:
            for i in self.Inbox("inbox"):
                if match.search(i):
                    if not self.invert:
                        self.send(i, "outbox")
                else:
                    if self.invert:
                        self.send(i, "outbox")
            if self.dataReady("control"):
                break
            self.pause()
            yield 1
        self.send(self.recv("control"), "signal")

class TwoWayBalancer(Axon.Component.component):
    Outboxes=["outbox1", "outbox2", "signal1","signal2"]
    def main(self):
        c = 0
        while 1:
            yield 1
            for job in self.Inbox("inbox"):
                if c == 0:
                    dest = "outbox1"
                else:
                    dest = "outbox2"
                c = (c + 1) % 2

                self.send(job, dest)
                job = None
            if not self.anyReady():
                self.pause()
            if self.dataReady("control"):
                break
        R=self.recv("control")
        self.send(R, "signal1")
        self.send(R, "signal2")


class Transcoder(Axon.ThreadedComponent.threadedcomponent):
    command = 'ffmpeg >transcode.log 2>&1 -i "%(SOURCEFILE)s" -s 640x360 -vcodec mpeg4 -acodec copy -vb 1500000 %(ENCODINGNAME)s'
    def main(self):
        while 1:
            for sourcefile in self.Inbox("inbox"):
                shortname = os.path.basename(sourcefile)
                encoding_name = shortname.replace(".mp4", ".avi")
                finalname = sourcefile.replace(".mp4", ".avi")
                # Do the actual transcode
                print "TRANSCODING", sourcefile, encoding_name
                os.system( self.command % {"SOURCEFILE": sourcefile, "ENCODINGNAME":encoding_name})

                # file is transcoded, move to done
                print "MOVING DONE FILE", sourcefile, os.path.join("done", sourcefile)
                os.rename(sourcefile, os.path.join("done", sourcefile))

                # Move encoded version to upload queue
                upload_name = os.path.join( "to_upload", encoding_name)
                print "MOVING TO UPLOAD QUEUE", encoding_name, upload_name
                os.rename(encoding_name, upload_name )

                # And tell the encoder to upload it please
                print "SETTING OFF UPLOAD",upload_name, finalname
                self.send( (upload_name, finalname), "outbox")
                print "-----------------"
            if self.dataReady("control"):
                break
        self.send(self.recv("control"), "signal")

class Uploader(Axon.ThreadedComponent.threadedcomponent):
    command = "ftpput --server=%(HOSTNAME)s --verbose --user=%(USERNAME)s --pass=%(PASSWORD)s --binary --passive %(UPLOADFILE)s"
    username =
< editted :-) >
    password = < editted :-) >
    hostname = "ftp.blip.tv"
    def main(self):
        while 1:
            for (upload_name, finalname) in self.Inbox("inbox"):
                print "UPLOADING", upload_name
                os.system( self.command % {
                                        "HOSTNAME":self.hostname,
                                        "USERNAME":self.username,
                                        "PASSWORD":self.password,
                                        "UPLOADFILE":upload_name,
                                     } )
                print "MOVING", upload_name, "TO", os.path.join("encoded", finalname)
                os.rename(upload_name, os.path.join("encoded", finalname))
                print "-----------------"

            if self.dataReady("control"):
                break
            if not self.anyReady():
                self.pause()
        self.send(self.recv("control"), "signal")

Graphline(
    FILES = Pipeline(
                Find(path=".",walktype="f"),
                Sort(),
                Grep(pattern="(done|encoded|unsorted|transcode.log|to_upload)",
                     invert = True),
            ),
    SPLIT = TwoWayBalancer(), # Would probably be nicer as a customised PAR chassis
    CONSUME1 = Pipeline(
                    Transcoder(),
                    Uploader(),
               ),
    CONSUME2 = Pipeline(
                    Transcoder(),
                    Uploader(),
               ),
    linkages = {
        ("FILES","outbox"):("SPLIT","inbox"),
        ("SPLIT","outbox1"):("CONSUME1","inbox"),
        ("SPLIT","outbox2"):("CONSUME2","inbox"),

        ("FILES","signal"):("SPLIT","control"),
        ("SPLIT","signal1"):("CONSUME1","control"),
        ("SPLIT","signal2"):("CONSUME2","control"),
    }
).run()

It should be fairly clear that this will go as fast as it can, so please be patient :-)


blog comments powered by Disqus