"""
Interface to various cdrtools (currently cdrecord and mkisofs)
"""
from tools import cmdoutput, striplist, which, TRUE, FALSE
from string import split, join, digits, find, strip, letters, atoi, lower, replace
from types import StringType
from log4py import Logger, LOGLEVEL_NORMAL, LOGLEVEL_DEBUG
from re import match, compile
import os
TRACK_MODE_DATA = "data"
TRACK_MODE_MODE2 = "mode2"
TRACK_MODE_AUDIO = "audio"
TRACK_MODE_XA1 = "xa1"
TRACK_MODE_XA2 = "xa2"
WRITE_MODE_DAO = "dao"
BLANK_MODE_DISC = "disc"
BLANK_MODE_FAST = "fast"
BLANK_MODE_SESSION = "session"
TRACK_STDIN = "-"
class cdrtools:
def __init__(self, loglevel = LOGLEVEL_NORMAL):
self.__cdrtools_logger = Logger().get_instance(self)
self.__cdrtools_logger.set_loglevel(loglevel)
self.__cdrtools_devices = []
self.__cdrtools_cdrecord_command = which("cdrecord")
self.__cdrtools_mkisofs_command = which("mkisofs")
self.__cdrtools_scanbus()
def devices(self):
""" Returns the devices found by cdrecord. """
return self.__cdrtools_devices
def joliet_charsets(self):
""" Returns an array containing all available joliet character sets. """
command_line = "%s -jcharset help 2>&1" % self.__cdrtools_mkisofs_command
self.__cdrtools_logger.debug("Reading available charsets: %s" % command_line)
jolietCharsetsTemp = cmdoutput(command_line, strip = TRUE)
jolietCharsets = []
appendCharset = FALSE
for i in range(len(jolietCharsetsTemp)):
if (appendCharset == TRUE):
jolietCharsets.append(jolietCharsetsTemp[i])
if (jolietCharsetsTemp[i][:14] == "Known charsets"):
appendCharset = TRUE
jolietCharsets.sort()
return jolietCharsets
def __cdrtools_scanbus(self):
""" Gets the list of available devices by executing "cdrecord -scanbus". """
command_line = "%s -scanbus 2>&1" % self.__cdrtools_cdrecord_command
self.__cdrtools_logger.debug("Searching for devices: %s" % command_line)
output = cmdoutput(command_line)
self.__cdrtools_devices = []
for i in range(len(output)):
line = output[i]
if (line[0] == "\t") and (line[1] in digits):
if (find(line, "CD-ROM") != -1):
channel = line[1]
id = line[3]
lun = line[5]
vendor = "%s %s %s" % (strip(line[13:21]), strip(line[24:40]), strip(line[43:47]))
self.__cdrtools_devices.append([channel, id, lun, vendor])
class cdrecord:
valid_track_modes = [ TRACK_MODE_DATA, TRACK_MODE_MODE2, TRACK_MODE_AUDIO, TRACK_MODE_XA1, TRACK_MODE_XA2 ]
valid_write_modes = [ WRITE_MODE_DAO ]
valid_blank_modes = [ BLANK_MODE_DISC, BLANK_MODE_FAST, BLANK_MODE_SESSION]
verbose_mode = FALSE
burnfree = FALSE
overburn = FALSE
fixate = TRUE
dummy_mode = FALSE
multisession = FALSE
eject = TRUE
fifosize = 4 * 1024 * 1024 # default: 4MB
speed = 1
pad_tracks = FALSE
swap_audio_tracks = FALSE
track_mode = ""
write_mode = ""
blank_mode = None
tsize = ""
def __init__(self, bus, target, lun, device, loglevel = LOGLEVEL_NORMAL):
self.__cdrecord_logger = Logger().get_instance(self)
self.__cdrecord_logger.set_loglevel(loglevel)
self.__cdrecord_command = which("cdrecord")
if (type(bus) == StringType): bus = atoi(bus)
if (type(target) == StringType): target = atoi(target)
if (type(lun) == StringType): lun = atoi(lun)
self.__cdrecord_bus = bus
self.__cdrecord_target = target
self.__cdrecord_lun = lun
self.__cdrecord_device = device
self.__cdrecord_tracks = []
self.__cdrecord_version_major = ""
self.__cdrecord_version_minor = ""
self.__cdrecord_version_micro = ""
self.__cdrecord_version_extra = ""
self.__cdrecord_read_version()
def version(self):
""" Returns the version string of cdrecord. """
if (self.__cdrecord_version_micro != None):
return "%s.%s%s%s" % (self.__cdrecord_version_major, self.__cdrecord_version_minor, self.__cdrecord_version_micro, self.__cdrecord_version_extra)
else:
return "%s.%s" % (self.__cdrecord_version_major, self.__cdrecord_version_minor)
def device(self):
return self.__cdrecord_device
def driver_opts(self):
""" Gets the available driver options for a given device.
Note: this function doesn't use the cdrecord driveropts yet.
"""
# output = cmdoutput("cdrecord -checkdrive dev=%s,%s,%s driveropts=help 2>&1" % (bus, target, lun), strip = TRUE)
driverOpts = []
if (atoi(self.__cdrecord_version_major) == 1):
if (atoi(self.__cdrecord_version_minor) >= 11) and ((self.__cdrecord_version_micro == None) or (self.__cdrecord_version_micro >= "a02")):
driverOpts.append("burnfree")
else:
driverOpts.append("burnfree")
if (not "burnfree" in driverOpts):
driverOpts.append("burnproof")
return driverOpts
def overburn_supported(self):
""" Version check wether overburn is supported or not. """
if (atoi(self.__cdrecord_version_major) == 1):
if (atoi(self.__cdrecord_version_minor) >= 11) and ((self.__cdrecord_version_micro == None) or (self.__cdrecord_version_micro >= "a01")):
return TRUE
else:
return FALSE
else:
return TRUE
def previous_session(self):
command_line = "cdrecord -msinfo dev=%d,%d,%d 2>&1" % (self.__cdrecord_bus, self.__cdrecord_target, self.__cdrecord_lun)
self.__cdrecord_logger.debug("Reading previous session: %s" % command_line)
output = cmdoutput(command_line, TRUE)[-1]
if (find(lower(output), "cannot") != -1) or (not output[0] in digits):
return None
else:
return output
def add_track(self, value):
if (value == TRACK_STDIN):
self.__cdrecord_tracks.append(value)
else:
self.__cdrecord_tracks.append("\"%s\"" % value)
def command_line(self):
cmdline = self.__cdrecord_command
if (self.verbose_mode): cmdline = "%s -v" % cmdline
if (self.burnfree):
options = self.driver_opts()
if ("burnfree" in options):
cmdline = "%s driveropts=burnfree" % cmdline
elif ("burnproof" in options):
cmdline = "%s driveropts=burnproof" % cmdline
if (self.dummy_mode): cmdline ="%s -dummy" % cmdline
if (self.eject): cmdline = "%s -eject" % cmdline
if (self.blank_mode == None):
if (self.overburn):
if (self.overburn_supported() == TRUE):
self.write_mode = WRITE_MODE_DAO
cmdline = "%s -overburn" % cmdline
if (self.tsize != ""):
cmdline = "%s tsize=%s" % (cmdline, self.tsize)
if (self.fixate == FALSE): cmdline = "%s -nofix" % cmdline
if (self.multisession): cmdline = "%s -multi" % cmdline
if (self.pad_tracks):
cmdline = "%s -pad" % cmdline
else:
cmdline = "%s -nopad" % cmdline
if (cdrecord.valid_track_modes.count(lower(self.track_mode)) > 0):
cmdline = "%s -%s" % (cmdline, lower(self.track_mode))
if (cdrecord.valid_write_modes.count(lower(self.write_mode)) > 0):
cmdline = "%s -%s" % (cmdline, lower(self.write_mode))
if (self.track_mode == TRACK_MODE_AUDIO):
if (self.swap_audio_tracks): cmdline = "%s -swab" % cmdline
if (type(self.fifosize) == StringType): self.fifosize = atoi(self.fifosize)
cmdline = "%s -fs=%d" % (cmdline, self.fifosize)
cmdline = "%s dev=%d,%d,%d" % (cmdline, self.__cdrecord_bus, self.__cdrecord_target, self.__cdrecord_lun)
if (type(self.speed) == StringType): self.speed = atoi(self.speed)
cmdline = "%s speed=%d" % (cmdline, self.speed)
if (self.blank_mode == None):
for i in range(len(self.__cdrecord_tracks)):
cmdline = "%s %s" % (cmdline, self.__cdrecord_tracks[i])
else:
if (cdrecord.valid_blank_modes.count(lower(self.blank_mode)) > 0):
cmdline = "%s -blank %s" % (cmdline, lower(self.blank_mode))
return cmdline
def get_write_speed(self):
""" Returns the maximum speed for writing CDs/DVDs. """
cmdline = self.__cdrecord_command
cmdline = "%s dev=%d,%d,%d driveropts=help -checkdrive -prcap" % (cmdline, self.__cdrecord_bus, self.__cdrecord_target, self.__cdrecord_lun)
regexp = compile(".*Maximum\ write\ speed:[\d\s]+kB\/s\ \(CD\s+(\d+)x\,\ DVD\s+(\d+)x.*")
output = cmdoutput(cmdline)
max_write_speed_cd = 1
max_write_speed_dvd = 1
for i in range(len(output)):
matchobject = regexp.match(output[i])
if (matchobject != None):
max_write_speed_cd = int(matchobject.group(1))
max_write_speed_dvd = int(matchobject.group(2))
return (max_write_speed_cd, max_write_speed_dvd)
# Private methods of the cdrecord class
def __cdrecord_read_version(self):
""" Reads the version string by executing "cdrecord -version". """
output = cmdoutput("%s -version 2>&1" % self.__cdrecord_command, strip = TRUE)
versionLine = output[0]
splitted = split(versionLine, " ")
version = split(splitted[1], ".")
self.__cdrecord_version_major = version[0]
position = -1
for i in range(len(version[1])):
if (version[1][i] in letters):
position = i
break
if (position != -1):
self.__cdrecord_version_minor = version[1][:position]
self.__cdrecord_version_micro = version[1][position:]
dash_position = find(self.__cdrecord_version_micro, "-")
if (dash_position != -1):
self.__cdrecord_version_extra = self.__cdrecord_version_micro[dash_position:]
self.__cdrecord_version_micro = self.__cdrecord_version_micro[:dash_position]
else:
self.__cdrecord_version_minor = version[1]
self.__cdrecord_version_micro = None
class mkisofs:
verbose_mode = FALSE
disable_deep_relocation = TRUE
full_iso9660_filenames = FALSE
allow_leading_dots = FALSE
follow_links = TRUE
joliet_charset = None
rational_rock = TRUE
rock_ridge = TRUE
omot_trailing_periods = TRUE
volume_id = ""
output_file = ""
boot_image = ""
boot_catalog = ""
gui_behaviour = FALSE
print_size = FALSE
def __init__(self, loglevel = LOGLEVEL_NORMAL):
self.__mkisofs_logger = Logger().get_instance(self)
self.__mkisofs_logger.set_loglevel(loglevel)
self.__mkisofs_command = which("mkisofs")
self.__mkisofs_gui_behaviour_supported = FALSE
self.__mkisofs_check_gui_parameter()
self.__mkisofs_multisession_magic_parameters = None
self.__mkisofs_multisession_device = None
self.__mkisofs_files = []
def multi_session(self, magic_parameters, device):
if (magic_parameters != None):
self.__mkisofs_multisession_magic_parameters = magic_parameters
self.__mkisofs_multisession_device = device
def add_file(self, filename):
if (self.__mkisofs_files.count(filename) == 0):
# mkisofs always requires "/" as separator, even on win32 platforms!
if (os.sep != "/"):
filename = replace(filename, os.sep, "/")
self.__mkisofs_files.append(filename)
def command_line(self):
""" Returns the complete command line including all set parameters. """
cmdline = self.__mkisofs_command
if (self.verbose_mode): cmdline = "%s -v" % cmdline
if (self.__mkisofs_gui_behaviour_supported) and (self.gui_behaviour): cmdline = "%s -gui" % cmdline
if (self.disable_deep_relocation): cmdline = "%s -D" % cmdline
if (self.full_iso9660_filenames): cmdline = "%s -l" % cmdline
if (self.allow_leading_dots): cmdline = "%s -L" % cmdline
if (self.follow_links): cmdline = "%s -f" % cmdline
if (self.joliet_charset != None): cmdline = "%s -J -jcharset %s" % (cmdline, self.joliet_charset)
if (self.__mkisofs_multisession_magic_parameters != None): cmdline = "%s -C %s -M %s" % (cmdline, self.__mkisofs_multisession_magic_parameters, self.__mkisofs_multisession_device)
if (self.rational_rock): cmdline = "%s -r" % cmdline
if (self.rock_ridge): cmdline = "%s -R" % cmdline
if (self.omot_trailing_periods): cmdline = "%s -d" % cmdline
if (self.volume_id != ""): cmdline = "%s -V \"%s\"" % (cmdline, self.volume_id)
if (self.output_file != ""): cmdline = "%s -o \"%s\"" % (cmdline, self.output_file)
if ((self.boot_catalog != "") and (self.boot_image != "")):
cmdline = "%s -b \"%s\" -c \"%s\"" % (cmdline, self.boot_image, self.boot_catalog)
if (self.print_size):
cmdline = "%s -print-size" % cmdline
if (len(self.__mkisofs_files) > 0):
cmdline = "%s -graft-points" % cmdline
for i in range(len(self.__mkisofs_files)):
filename = self.__mkisofs_files[i]
if (os.path.isdir(filename)):
if (filename[-1] == "/"):
filename = filename[:-1]
shortfilename = split(filename, os.sep)[-1]
cmdline = "%s \"/%s/\"=\"%s\"" % (cmdline, shortfilename, filename)
else:
cmdline = "%s \"%s\"" % (cmdline, filename)
return cmdline
def __mkisofs_check_gui_parameter(self):
output = cmdoutput("%s 2>&1 | grep gui" % self.__mkisofs_command)
if (len(output) == 0):
self.__mkisofs_gui_behaviour_supported = FALSE
else:
self.__mkisofs_gui_behaviour_supported = TRUE
class readcd:
filename = None
def __init__(self, bus, target, lun, loglevel = LOGLEVEL_NORMAL):
self.__readcd_logger = Logger().get_instance(self)
self.__readcd_logger.set_loglevel(loglevel)
self.__readcd_command = which("readcd")
if (type(bus) == StringType): bus = atoi(bus)
if (type(target) == StringType): target = atoi(target)
if (type(lun) == StringType): lun = atoi(lun)
self.__readcd_bus = bus
self.__readcd_target = target
self.__readcd_lun = lun
def command_line(self):
cmdline = self.__readcd_command
cmdline = "%s dev=%d,%d,%d" % (cmdline, self.__readcd_bus, self.__readcd_target, self.__readcd_lun)
cmdline = "%s -f \"%s\"" % (cmdline, self.filename)
return cmdline
class cdda2wav:
def __init__(self, bus, target, lun, loglevel = LOGLEVEL_NORMAL):
self.__cdda2wav_logger = Logger().get_instance(self)
self.__cdda2wav_logger.set_loglevel(loglevel)
self.__cdda2wav_command = which("cdda2wav")
self.__cdda2wav_bus = int(bus)
self.__cdda2wav_target = int(target)
self.__cdda2wav_lun = int(lun)
self.__cdda2wav_read_version()
def __cdda2wav_read_version(self):
command = "%s --version 2>&1" % self.__cdda2wav_command
self.__cdda2wav_logger.debug("Executing %s" % command)
output = cmdoutput(command)
regexp = compile(".*version\ (\d)\.(\d).*")
matchobject = regexp.match(output[0])
if (matchobject != None):
self.__cdda2wav_version_major = int(matchobject.group(1))
self.__cdda2wav_version_minor = int(matchobject.group(2))
else:
self.__cdda2wav_version_major = 1
self.__cdda2wav_version_minor = 0
def get_version(self):
return "%d.%d" % (self.__cdda2wav_version_major, self.__cdda2wav_version_minor)
def get_track_info(self):
command = "%s -D %d,%d,%d -N -J -g -Q" % (self.__cdda2wav_command, self.__cdda2wav_bus, self.__cdda2wav_target, self.__cdda2wav_lun)
if (self.__cdda2wav_version_major < 2):
command = "%s -v 2" % command
else:
command = "%s -v toc" % command
command = "%s 2>&1" % command
self.__cdda2wav_logger.debug("Executing %s" % command)
output = cmdoutput(command)
tracklengths = {}
trackinfo = {}
trackinfo["error_message"] = ""
for i in range(len(output)):
line = strip(output[i])
if (line != ""):
if (match("Tracks.*", line)):
splitted = split(line[7:], " ")
trackinfo["number_of_tracks"] = strip(splitted[0])
trackinfo["total_time"] = strip(splitted[1])
elif (match("T\d.*", line)):
tracknum = atoi(line[1:3])
length = strip(line[13:21])
tracklengths[tracknum] = length
elif (match(".*Permission.*denied.*", line)):
trackinfo["error_message"] = "Permission denied while trying to read the CD content"
elif (match(".*Read.*TOC.*size.*failed.*", line)):
trackinfo["error_message"] = "Failed to read CD content"
trackinfo["track_lengths"] = tracklengths
return trackinfo
def test():
mycdda2wav = cdda2wav(0, 0, 0, LOGLEVEL_DEBUG)
print mycdda2wav.get_version()
print mycdda2wav.get_track_info()
if (__name__ == "__main__"):
test()
syntax highlighted by Code2HTML, v. 0.9.1