Europython Videos Transcoding
July 07, 2009 at 01:02 AM | categories: python, oldblog | View Comments
Since I've had a few questions about this, a short status update. At Europython last week I was recording all the talks I was attending. Including the lightning talks this means I have video from 55 talks. The video files from the camera are too large for blip.tv, so I'm transcoding them down to a smaller size, before uploading them. Since these 55 talks are spread over nearly 80 files, that naturally takes time.
Fortunately/obviously, I'm automating this, and it'll come as no shock to some that I'm automating it using kamaelia. This automation needs to to be stoppable, since I need to only do this overnight, for practicality reasons.
Anyway, for those curious, this is the code I'm using to do the transcode & upload. You'll note that it saturates my CPU, keeping both cores busy. Also, it's interleaving an IO bound process (ftp upload) with CPU bound - transcode.
It should be fairly clear that this will go as fast as it can, so please be patient :-)
Fortunately/obviously, I'm automating this, and it'll come as no shock to some that I'm automating it using kamaelia. This automation needs to to be stoppable, since I need to only do this overnight, for practicality reasons.
Anyway, for those curious, this is the code I'm using to do the transcode & upload. You'll note that it saturates my CPU, keeping both cores busy. Also, it's interleaving an IO bound process (ftp upload) with CPU bound - transcode.
import os
import re
import Axon
from Kamaelia.Chassis.Graphline import Graphline
from Kamaelia.Chassis.Pipeline import Pipeline
class Find(Axon.Component.component):
path = "."
walktype = "a"
act_like_find = True
def find(self, path = ".", walktype="a"):
if walktype == "a":
addfiles = True
adddirs = True
elif walktype == "f":
addfiles = True
adddirs = False
elif walktype == "d":
adddirs = True
addfiles = False
deque = []
deque.insert(0, (os.path.join(path,x) for x in os.listdir(path)) )
while len(deque)>0:
try:
fullentry = deque[0].next()
if os.path.isdir(fullentry):
if adddirs:
yield fullentry
try:
X= [os.path.join(fullentry,x) for x in os.listdir(fullentry)]
deque.insert(0, iter(X))
except OSError:
if not self.act_like_find:
raise
elif os.path.isfile(fullentry):
if addfiles:
yield fullentry
except StopIteration:
deque.pop(0)
def main(self):
gotShutdown = False
for e in self.find(path = self.path, walktype=self.walktype):
self.send(e, "outbox")
yield 1
if self.dataReady("control"):
gotShutdown = True
break
if not gotShutdown:
self.send(Axon.Ipc.producerFinished(), "signal")
else:
self.send(self.recv("control"), "signal")
class Sort(Axon.Component.component):
def main(self):
dataset = []
while 1:
for i in self.Inbox("inbox"):
dataset.append(i)
if self.dataReady("control"):
break
self.pause()
yield 1
dataset.sort()
for i in dataset:
self.send(i, "outbox")
yield 1
self.send(self.recv("control"), "signal")
class Grep(Axon.Component.component):
pattern = "."
invert = False
def main(self):
match = re.compile(self.pattern)
while 1:
for i in self.Inbox("inbox"):
if match.search(i):
if not self.invert:
self.send(i, "outbox")
else:
if self.invert:
self.send(i, "outbox")
if self.dataReady("control"):
break
self.pause()
yield 1
self.send(self.recv("control"), "signal")
class TwoWayBalancer(Axon.Component.component):
Outboxes=["outbox1", "outbox2", "signal1","signal2"]
def main(self):
c = 0
while 1:
yield 1
for job in self.Inbox("inbox"):
if c == 0:
dest = "outbox1"
else:
dest = "outbox2"
c = (c + 1) % 2
self.send(job, dest)
job = None
if not self.anyReady():
self.pause()
if self.dataReady("control"):
break
R=self.recv("control")
self.send(R, "signal1")
self.send(R, "signal2")
class Transcoder(Axon.ThreadedComponent.threadedcomponent):
command = 'ffmpeg >transcode.log 2>&1 -i "%(SOURCEFILE)s" -s 640x360 -vcodec mpeg4 -acodec copy -vb 1500000 %(ENCODINGNAME)s'
def main(self):
while 1:
for sourcefile in self.Inbox("inbox"):
shortname = os.path.basename(sourcefile)
encoding_name = shortname.replace(".mp4", ".avi")
finalname = sourcefile.replace(".mp4", ".avi")
# Do the actual transcode
print "TRANSCODING", sourcefile, encoding_name
os.system( self.command % {"SOURCEFILE": sourcefile, "ENCODINGNAME":encoding_name})
# file is transcoded, move to done
print "MOVING DONE FILE", sourcefile, os.path.join("done", sourcefile)
os.rename(sourcefile, os.path.join("done", sourcefile))
# Move encoded version to upload queue
upload_name = os.path.join( "to_upload", encoding_name)
print "MOVING TO UPLOAD QUEUE", encoding_name, upload_name
os.rename(encoding_name, upload_name )
# And tell the encoder to upload it please
print "SETTING OFF UPLOAD",upload_name, finalname
self.send( (upload_name, finalname), "outbox")
print "-----------------"
if self.dataReady("control"):
break
self.send(self.recv("control"), "signal")
class Uploader(Axon.ThreadedComponent.threadedcomponent):
command = "ftpput --server=%(HOSTNAME)s --verbose --user=%(USERNAME)s --pass=%(PASSWORD)s --binary --passive %(UPLOADFILE)s"
username = < editted :-) >
password = < editted :-) >
hostname = "ftp.blip.tv"
def main(self):
while 1:
for (upload_name, finalname) in self.Inbox("inbox"):
print "UPLOADING", upload_name
os.system( self.command % {
"HOSTNAME":self.hostname,
"USERNAME":self.username,
"PASSWORD":self.password,
"UPLOADFILE":upload_name,
} )
print "MOVING", upload_name, "TO", os.path.join("encoded", finalname)
os.rename(upload_name, os.path.join("encoded", finalname))
print "-----------------"
if self.dataReady("control"):
break
if not self.anyReady():
self.pause()
self.send(self.recv("control"), "signal")
Graphline(
FILES = Pipeline(
Find(path=".",walktype="f"),
Sort(),
Grep(pattern="(done|encoded|unsorted|transcode.log|to_upload)",
invert = True),
),
SPLIT = TwoWayBalancer(), # Would probably be nicer as a customised PAR chassis
CONSUME1 = Pipeline(
Transcoder(),
Uploader(),
),
CONSUME2 = Pipeline(
Transcoder(),
Uploader(),
),
linkages = {
("FILES","outbox"):("SPLIT","inbox"),
("SPLIT","outbox1"):("CONSUME1","inbox"),
("SPLIT","outbox2"):("CONSUME2","inbox"),
("FILES","signal"):("SPLIT","control"),
("SPLIT","signal1"):("CONSUME1","control"),
("SPLIT","signal2"):("CONSUME2","control"),
}
).run()
It should be fairly clear that this will go as fast as it can, so please be patient :-)