# Part of the A-A-P recipe executive: Utility functions
# Copyright (C) 2002-2003 Stichting NLnet Labs
# Permission to copy and use this file is specified in the file COPYING.
# If this file is missing you can find it here: http://www.a-a-p.org/COPYING
#
# Util: utility functions
#
# It's OK to do "from Util import *", these things are supposed to be global.
#
import string
import os.path
import types
import sys
import import_re # import the re module in a special way
import glob
from Error import *
from Message import *
def i18n_init():
"""Set up Internationalisation: setlocale() and gettext()."""
# Chicken-egg problem: Should give informational messages here, but since
# the options haven't been parsed yet we don't know if the user wants us to
# be verbose. Let's keep quiet.
# If already done return quickly.
import __builtin__
if __builtin__.__dict__.has_key("_"):
return
# Set the locale to the users default.
try:
import locale
try:
locale.setlocale(locale.LC_ALL, '')
except locale.Error:
pass # there is a locale module but it doesn't work
except ImportError:
pass # locale module cannot be imported
# Set up for translating messages, if possible.
# When the gettext module is missing this results in an ImportError.
# An older version of gettext doesn't support install(), it generates an
# AttributeError.
# An IOError is generated when the module "a-a-p" cannot be found.
# If not possible, define the _() and N_() functions to do nothing.
# Make them builtin so that they are available everywhere.
try:
import gettext
gettext.install("a-a-p")
except (ImportError, AttributeError, IOError):
def nogettext(s):
return s
__builtin__.__dict__['_'] = nogettext
__builtin__.__dict__['N_'] = nogettext
def is_white(c):
"""Return 1 if "c" is a space or a Tab."""
return c == ' ' or c == '\t'
def skip_white(line, i):
"""Skip whitespace, starting at line[i]. Return the index of the next
non-white character, or past the end of "line"."""
try:
while line[i] == ' ' or line[i] == '\t':
i = i + 1
except IndexError:
pass
return i
def skip_to_white(line, i):
"""Skip non-whitespace, starting at line[i]. Return the index of the next
white character, or past the end of "line"."""
try:
while line[i] != ' ' and line[i] != '\t':
i = i + 1
except IndexError:
pass
return i
def get_token(arg, i):
"""Get one white-space separated token from arg[i:].
Handles single and double quotes and keeps them (see get_item() to
remove quotes).
A sequence of white space is also a token.
Returns the token and the index after it."""
# If "arg" starts with white space, return the span of white space.
if is_white(arg[i]):
e = skip_white(arg, i)
return arg[i:e], e
# Isolate the token until white space or end of the argument.
inquote = ''
arg_len = len(arg)
token = ''
while i < arg_len:
if inquote:
if arg[i] == inquote:
inquote = ''
elif arg[i] == "'" or arg[i] == '"':
inquote = arg[i]
elif is_white(arg[i]):
break
token = token + arg[i]
i = i + 1
return token, i
def list2string(lines):
"""Convert a list of strings into a single string."""
# The simple but slow method:
# return reduce(lambda a, b: a + b, lines)
# The clumsy but fast method:
import cStringIO
s = cStringIO.StringIO()
for line in lines:
s.write(line)
return s.getvalue()
def check_exists(rpstack, fname):
"""Give an error message if file "fname" already exists."""
if os.path.exists(fname):
from Process import recipe_error
recipe_error(rpstack, _('File already exists: "%s"') % fname)
def scopechar(c):
"""Return 1 when "c" is a scope name character, 0 otherwise."""
return c in string.digits or c in string.letters or c == "_"
def varchar(c):
"""Return 1 when "c" is a variable name character, 0 otherwise."""
return c in string.digits or c in string.letters or c == "_" or c == "."
def skip_varchars(arg, idx):
"""Skip over varable name chars in arg[idx:], return the index."""
e = idx
try:
while 1:
c = arg[e]
if not (c in string.digits or c in string.letters
or c == "_" or c == "."):
break
e = e + 1
except:
pass
return e
def unquote(recdict, str):
"""Remove quotes from "str". Assumes aap style quoting."""
res = ''
inquote = ''
for c in str:
if c == inquote:
inquote = '' # End of quoted text.
elif not inquote and (c == '"' or c == "'"):
inquote = c # Start of quoted text.
else:
res = res + c
if inquote:
msg_info(recdict, _('Missing quote in "%s"') % str)
return res
def enquote(s, quote = '"'):
"""Put quotes around "s" so that it is handled as one item. Uses aap style
quoting (a mix of single and double quotes)."""
result = quote
slen = len(s)
i = 0
while i < slen:
if s[i] == quote:
if quote == '"':
result = result + "'\"'\"'"
else:
result = result + '"\'"\'"'
else:
result = result + s[i]
i = i + 1
return result + quote
def double_quote(str):
"""Put double quotes around "str" when it contains white space or a quote.
Contained double quotes are doubled."""
quote = ''
for c in str:
if c == "'" or c == '"' or is_white(c):
quote = '"'
break
if not quote:
return str # no quoting required
res = quote
for c in str:
if c == '"':
res = res + '"'
res = res + c
return res + quote
def bs_quote(str):
"""Escape special characters in "str" with a backslash. Special characters
are white space, quotes and backslashes."""
res = ''
for c in str:
if c == '\\' or c == "'" or c == '"' or is_white(c):
res = res + '\\'
res = res + c
return res
def get_indent(line):
"""Count the number of columns of indent at the start of "line"."""
i = 0
col = 0
try:
while 1:
if line[i] == ' ':
col = col + 1
elif line[i] == '\t':
col = col + 8 - (col % 8)
else:
break
i = i + 1
except IndexError:
pass
return col
def append2var(recdict, scope, varname, str):
"""Add "str" the variable "scope.varname" for "recdict"."""
v = recdict[scope].get(varname)
if v:
if isinstance(v, ExpandVar):
recdict[scope][varname] = ExpandVar(v.getVal() + ' ' + str)
else:
recdict[scope][varname] = v + ' ' + str
else:
recdict[scope][varname] = str
def add_cleanfiles(recdict, str):
"""Add "str" the "CLEANFILES" for the recipe with "recdict"."""
append2var(recdict, "_recipe", "CLEANFILES", str)
def add_distfiles(recdict, str):
"""Add "str" the "DISTFILES" for the recipe with "recdict"."""
append2var(recdict, "_recipe", "DISTFILES", str)
def rem_dup(var):
"""Remove duplicates from a variable string. Attributes are ignored when
comparing items."""
from Dictlist import dictlist2str, str2dictlist
expand = isinstance(var, ExpandVar)
if expand:
val = var.getVal()
else:
val = var
ndl = []
for d in str2dictlist([], val):
dup = 0
for nd in ndl:
if d["name"] == nd["name"]:
dup = 1
break
if not dup:
ndl.append(d)
s = dictlist2str(ndl)
if expand:
return ExpandVar(s)
return s
class Expand:
"""Kind of expansion used for $VAR."""
quote_none = 0 # no quoting
quote_aap = 1 # quoting with " and '
quote_double = 2 # quoting with ", backslash for escaping
quote_bs = 3 # escaping with backslash
quote_shell = 4 # quoting with backslash or " for the shell
def __init__(self, attr = 1, quote = quote_aap, skip_errors = 0):
self.attr = attr # include attributes
self.quote = quote # quoting with " and '
self.rcstyle = 0 # rc-style expansion
self.skip_errors = skip_errors # ignore errors
self.optional = 0 # empty when not defined
self.slash = 0 # no / to \ replacement
class ExpandVar:
"""
Class to hold a variable value that needs to be expanded when it is
used.
"""
def __init__(self, val):
self.val = val
def getVal(self):
return self.val
def __add__(self, other):
"Concatenate ExpandVar like a string. Equivalent to $+= in aap."
return ExpandVar(self.getVal() + other)
def __str__(self):
"Expand the value"
from Commands import expand
return expand(0, Global.globals, self.getVal(),
Expand(1, Expand.quote_aap))
def get_var_val_int(recdict, name):
"""
Get variable for internal use. Get it from "recdict" directly or from
the "_up" scope. Do not use the "_no" scope to avoid setting the flag
that the variable was read before set.
"""
if recdict.has_key(name):
return recdict[name]
return get_var_val(0, recdict, "_up", name)
def get_var_val(line_nr, recdict, scope, name, argexpand = None):
"""
Get the value of variable "name", expanding it when postponed evaluation
was specified for the assignment.
Use "scope" if it is set. "_no" can be used to search up the stack.
When the variable "name" does not exist return None.
"""
from Commands import expand
if not scope:
val = recdict.get(name)
else:
try:
val = recdict[scope][name]
except:
val = None # either the scope or the variable doesn't exist
if val is None:
return None
# The variable may have been set by a Python statement, using a type
# different from the string we expect.
# Automatically convert a number to a string.
if isinstance(val, types.IntType) or isinstance(val, types.LongType):
val = str(val)
if isinstance(val, ExpandVar):
val = expand(line_nr, recdict, val.getVal(),
Expand(1, Expand.quote_aap))
# Return if no expansion to be done.
if not argexpand:
return val
# $/var: change all slashes to backslashes.
if argexpand.slash:
val = string.replace(val, '/', '\\')
# Return when expanding doesn't change the value:
# - Include attributes and aap quoting is the default.
# - There are no attributes and aap quoting.
if argexpand.quote == Expand.quote_aap and (argexpand.attr
or not '{' in val):
return val
# Remove attributes and/or change the quoting. This is done by turning the
# string into a dictlist and then back into a string.
# Be permissive about errors and keep whitespace when there are no
# attributes are to be removed.
from Dictlist import dictlist2str, str2dictlist
try:
dl = str2dictlist([], val)
if argexpand.quote != Expand.quote_aap:
use_val = 0
else:
use_val = 1
for d in dl:
if len(d.keys()) > 1:
use_val = 0
break
if use_val:
res = val
else:
res = dictlist2str(dl, argexpand)
except UserError, e:
res = val # ignore the error, return unexpanded
if not argexpand.skip_errors:
from Work import getrpstack
msg_warning(recdict, (_('Error expanding "%s": ') % val) + str(e),
rpstack = getrpstack(recdict, line_nr))
return res
def expand_item(item, argexpand, key = "name"):
"""Expand one "item" (one entry of a variable converted to a dictlist),
according to "argexpand"."""
res = expand_itemstr(item[key], argexpand)
if argexpand.attr:
from Dictlist import dictlistattr2str
res = res + dictlistattr2str(item)
return res
def expand_itemstr(str, argexpand):
"""Expand the string value of an item accoding to "argexpand"."""
if argexpand.quote == Expand.quote_shell:
if os.name == "posix":
# On Unix a mix of double and single quotes works well
# Also escape characters that have a special meaning.
from Dictlist import listitem2str
res = listitem2str(str, ' \t&;|$<>', '&;|')
else:
# On MS-Windows double quotes works well
res = double_quote(str)
else:
if argexpand.quote == Expand.quote_none:
res = str
elif argexpand.quote == Expand.quote_aap:
from Dictlist import listitem2str
res = listitem2str(str)
elif argexpand.quote == Expand.quote_double:
res = double_quote(str)
else:
res = bs_quote(str)
return res
def has_wildcard(s):
"""
Return non-zero when "s" contains '*', '?' or '[abc]'.
"""
return re.search("[*?[]", s) != None
def expand_unescape(s):
"""
Remove wildcard escaping from "s".
Currenly only changes "[*]" things to "*".
"""
# Be quick when nothing to be done.
ni = string.find(s, '[')
if ni < 0:
return s
res = ""
i = 0
while 1:
res = res + s[i:ni]
if s[ni + 2] == ']':
res = res + s[ni + 1]
i = ni + 3
else:
res = res + '['
i = ni + 1
if i >= len(s):
break
ni = string.find(s, '[', i)
if ni < 0 or ni + 2 >= len(s):
return res + s[i:]
return res
def oct2int(s):
"""convert string "s", which is an octal number, to an int. Isn't there a
standard Python function for this?"""
v = 0
for c in s:
if not c in string.octdigits:
raise UserError, _('non-octal chacacter encountered in "%s"') % s
v = v * 8 + int(c)
return v
def full_fname(name):
"""Make a full, uniform file name out of "name". Used to be able to
compare filenames with "./" and "../" things in them, also after
changing directories."""
res = os.path.abspath(os.path.normpath(name))
if os.name == "posix":
return res
return fname_fold(res)
def fname_fold(name):
"""
Turn a file name into a uniform format: on non-Unix systems change
backslashes to slashes and make all characters lowercase. Cannot use
os.path.normcase() and os.path.normpath() here, they change slashes to
backslashes on MS-Windows.
"""
if os.name == "posix":
return name
return string.replace(string.lower(name), '\\', '/')
def fname_equal(one, two):
"""
Return non-zero when "one" and "two" are the same file (or directory).
"""
if os.name == "posix":
# Use samefile() to detect symbolic links. But this only works for
# existing files, thus compare the names if it fails. And it's not
# supported on all systems.
try:
r = os.path.samefile(one, two)
except:
r = 0
if r:
return r
return fname_fold(one) == fname_fold(two)
def fname_fold_same():
"""Return if fname_fold() always returns a file name unmodified."""
return os.name == "posix"
def path_join(first, second):
"""Special version of os.path.join() that uses the path separator that is
being used already."""
c = os.sep
if string.find(first, '/') >= 0 and os.sep == '\\':
os.sep = '/'
ret = os.path.join(first, second)
os.sep = c
return ret
def shorten_name(name, dir = None):
"""Shorten a file name when it's relative to directory "dir".
If "dir" is not given, use the current directory.
Prefers using "../" when part of "dir" matches.
Also handles backslashes on non-posix systems"""
from Remote import is_url
if is_url(name):
return name # can't shorten a URL
adir = dir
if adir is None:
adir = os.getcwd()
dir_len = len(adir)
c = adir[dir_len - 1]
if c != '/' and c != '\\':
adir = adir + os.sep # make sure "adir" ends in a slash
dir_len = dir_len + 1
# Skip over the path components that are equal
name_len = len(name)
i = 0
slash = -1
while i < dir_len and i < name_len:
if os.name == "posix":
if adir[i] != name[i]:
break
if adir[i] == '/':
slash = i
else:
if ((adir[i] == '/' or adir[i] == '\\')
and (name[i] == '/' or name[i] == '\\')):
slash = i
elif string.lower(adir[i]) != string.lower(name[i]):
break
i = i + 1
# For a full match and "name" is a directory pretend "name" ends in a
# slash.
if (i == name_len
and name_len > 0
and i < dir_len
and (adir[i] == '/' or adir[i] == '\\')
and name[-1] != '/'
and name[-1] != '\\'
and os.path.isdir(name)):
slash = i
i = i + 1
# If nothing is equal, return the full name
if slash <= 0:
return name
# For a full match with "adir" return the name without it.
if i == dir_len:
return name[dir_len:]
# Insert "../" for the components in "adir" that are not equal.
# Example: name = "/foo/bdir/foo.o"
# adir = "/foo/test"
# result = "../bdir/foo.o"
back = ''
while i < dir_len:
if adir[i] == '/' or adir[i] == '\\':
back = back + ".." + os.sep
i = i + 1
# Don't include a trailing slash when "name" is part of "adir".
if slash + 1 >= name_len and back:
return back[:-1]
return back + name[slash + 1:]
def display_name(name):
"""Change a file name into a form suitable for displaying:
fname.txt (/path/dir/) """
absname = full_fname(name)
return os.path.basename(absname) + " (" + os.path.dirname(absname) + ")"
def shorten_dictlist(dictlist):
"""Shorten a dictlist to the current directory. Returns a copy of the
dictlist with identical attributes and shortened names."""
adir = os.getcwd()
newlist = []
for item in dictlist:
new_item = {}
for k in item.keys():
if k == "name":
if item.has_key("_node") and k == "name":
new_item[k] = shorten_name(item["_node"].get_name(), adir)
else:
new_item[k] = shorten_name(item[k], adir)
else:
new_item[k] = item[k]
newlist.append(new_item)
return newlist
def aap_checkdir(rpstack, recdict, fname):
"""Make sure the directory for file "fname" exists."""
dirname = os.path.dirname(fname)
if dirname:
assert_dir(rpstack, recdict, dirname)
def assert_dir(rpstack, recdict, dirname):
"""Make sure the directory "dirname" exists."""
if not os.path.exists(dirname):
msg_info(recdict, _('Creating directory "%s"') % dirname)
try:
os.makedirs(dirname)
except StandardError, e:
from Process import recipe_error
recipe_error(rpstack, (_('Cannot create directory "%s": ')
% dirname) + str(e))
elif not os.path.isdir(dirname):
from Process import recipe_error
recipe_error(rpstack, _('"%s" exists but is not a directory') % dirname)
def dir_contents(dir, recurse = 0, join = 1):
"""Return a list with the contents of directory "dir". Takes care not to
expand wildcards in "dir".
When "recurse" is non-zero, recursively get contents of directories.
When "join" is zero, don't join "dir" to the resulting list.
Raises an Exception when something is wrong."""
cwd = os.getcwd()
os.chdir(dir)
try:
# Get contents of "dir".
alist = glob.glob("*")
if os.name == "posix":
alist = alist + glob.glob(".*")
# When recursive, get contents of each directory in "dir".
if recurse:
newlist = []
for item in alist:
if os.path.isdir(item):
newlist.extend(dir_contents(item, 1))
else:
newlist.append(item)
alist = newlist
finally:
os.chdir(cwd)
if join:
newlist = []
for l in alist:
newlist.append(os.path.join(dir, l))
return newlist
return alist
def deltree(dir):
"""
Recursively delete a directory or a file. But for a symlink delete the
link itself, not a directory it's pointing to.
"""
if os.path.isdir(dir) and not isalink(dir):
for f in dir_contents(dir):
deltree(f)
os.rmdir(dir)
else:
os.remove(dir)
def isalink(fname):
"""
Version of os os.readlink() that returns False on non-Unix systems.
"""
islink = 0
try:
islink = os.readlink(fname)
except:
# Ignore errors, doesn't work on non-Unix systems.
pass
return islink
def date2secs(str):
"""Convert a string like "12 days" to a number of seconds."""
str_len = len(str)
i = 0
while i < str_len:
if not str[i] in string.digits:
break
i = i + 1
if i == 0:
raise UserError, _('Must start with a number: "%s"') % str
nr = int(str[:i])
i = skip_white(str, i)
if str[i:] == "day":
return nr * 24 * 60 * 60
if str[i:] == "hour":
return nr * 60 * 60
if str[i:] == "min":
return nr * 60
if str[i:] == "sec":
return nr
raise UserError, _('Must have day, hour, min or sec: "%s"') % str
def dictlist_sameentries(one, two):
"""
Compare the entries in dictlists "one" and "two" and return non-zero if
they contain the same entries. Only checks the "name" of the entries and
ignores the order.
"""
if len(one) != len(two):
return 0
for o in one:
found = 0
for t in two:
if o["name"] == t["name"]:
found = 1
if not found:
return 0
return 1
def cflags_normal():
"""
Return $OPTIMIZE and $DEBUG in a form that works for most C compilers.
"""
recdict = Global.globals
opt = get_var_val(0, recdict, "_no", "OPTIMIZE")
if opt:
if opt == "size":
opt = 's'
else:
try:
opt = int(opt)
except:
msg_warning(recdict,
_("OPTIMIZE must be 'size' or a number: %s") % opt)
opt = 0
if opt > 0:
opt = "%d" % opt
else:
opt = None
dbg = get_var_val(0, recdict, "_no", "DEBUG")
if dbg == "no":
dbg = None
if opt and dbg:
# TODO: check if the compiler accepts both arguments
res = "-O%s -g" % opt
elif opt:
res = "-O%s" % opt
elif dbg:
res = "-g"
else:
res = ''
return res
def get_sys_option(cmd):
"""Check for system command options. Ignore anything that is not a valid
option.
Return the remaining command and a dictionary of the options."""
from Dictlist import parse_attr
cmd_len = len(cmd)
idx = 0
adict = {}
while 1:
idx = skip_white(cmd, idx)
if idx >= cmd_len or cmd[idx] != '{':
break
try:
name, val, idx = parse_attr([], cmd, idx)
except UserError:
# An error probably means the shell command starts with "{".
name = None
if name == 'q' or name == 'quiet':
adict['quiet'] = val
elif name == 'i' or name == 'interactive':
adict['interactive'] = val
elif name == 'l' or name == 'log':
adict['log'] = val
elif name == 'f' or name == 'force':
adict['force'] = val
else:
break
return cmd[idx:], adict
def logged_system(recdict, cmd):
"""Execute a system command.
Display the command and log the output.
Returns the return value of os.system()."""
# Redirect the output of each line to a file.
# Don't do this for lines that contain redirection themselves.
# TODO: make this work on non-Posix systems.
tmpfilex = ''
if msg_logname():
from RecPython import tempfname
tmpfile = tempfname()
if os.name == "posix":
tmpfilex = tempfname()
else:
tmpfile = None
newcmd = ''
append = ""
for line in string.split(cmd, '\n'):
if not line:
continue # skip empty lines
rest, adict = get_sys_option(line)
if adict.get("quiet"):
msg_log(recdict, line)
else:
msg_system(recdict, line)
if adict.get("interactive"):
newcmd = newcmd + rest + '\n'
elif adict.get("log") or Global.sys_cmd_log:
rd = ">"
if not tmpfile: # not logging, throw it in the bit bucket
of = "/dev/null"
else:
of = tmpfile # write or append to the logfile
if append:
rd = ">>"
if os.name == "posix":
# The extra "2>&1" just after the command is unexpectedly
# needed to redirect stderr, otherwise it's echoed.
newcmd = newcmd + ("{ %s 2>&1; echo $? > %s; } 2>&1 %s%s\n"
% (rest, tmpfilex, rd, of))
else:
if of == "/dev/null":
newcmd = newcmd + ("%s\n" % (rest))
else:
newcmd = newcmd + ("%s %s%s\n" % (rest, rd, of))
append = "-a " # append next command output
elif tmpfile and string.find(rest, '>') < 0:
if os.name == "posix":
newcmd = newcmd + ("{ %s; echo $? > %s; } 2>&1 | tee %s%s\n"
% (rest, tmpfilex, append, tmpfile))
else:
rd = ">"
if append:
rd = ">>"
newcmd = newcmd + ("%s %s%s\n" % (rest, rd, tmpfile))
append = "-a " # append next command output
else:
newcmd = newcmd + rest + '\n'
try:
if os.name == "posix":
res = os.system(newcmd)
else:
# system() on MS-Windows can handle only one command at a time and
# must not end in a NL. Do the same on other non-Unix systems for
# now.
# TODO: system() isn't available on the Mac
# TODO: system() always returns zero for Windows 9x
for line in string.split(newcmd, '\n'):
if line: # skip empty lines
if osname() == "mswin":
# Hack: os.system() doesn't always handle command names
# with a space in them or with double quotes. Write
# the command in a batch file and execute it.
tmpbat = tempfname() + ".bat"
f = open(tmpbat, "w")
f.write("@echo off\r\n")
f.write(line + "\r\n")
f.close()
line = tmpbat
try:
res = os.system(line)
finally:
if osname() == "mswin":
os.unlink(line)
if res:
break
except KeyboardInterrupt:
msg_info(recdict, _("Interrupted"))
res = 1
if tmpfile:
# Read the file that contains the exit status of the executed command.
# "res" could be the exit status of "tee".
if os.name == "posix":
rr = '0'
try:
f = open(tmpfilex)
rr = f.read(10)
f.close()
except:
pass
# Be prepared for strange results: Only use the number at the start
# of the file. Avoids crashing here.
rr = re.sub("^(\\d*)(.|\\s)*", "\\1", rr)
if rr and rr != '0':
res = int(rr)
if res == 0:
res = 1
# Append the output to the logfile.
try:
f = open(tmpfile)
text = f.read()
f.close()
except:
text = ''
if text:
if Global.sys_cmd_log:
Global.sys_cmd_log = Global.sys_cmd_log + text
else:
msg_log(recdict, text)
if os.name != "posix" or sys.stdout != sys.__stdout__:
# No "tee" command or stdout is going somehwere else, echo
# the system command output.
print text
# Always delete the temp files
try_delete(tmpfile)
if tmpfilex:
try_delete(tmpfilex)
return res
def redir_system_int(recdict, cmd, use_tee = 1):
"""Execute "cmd" with the shell and catch the output.
Returns two things: a number (non-zero for success) and the output of
the command. The caller may want to use msg_log() for the text.
"""
from RecPython import tempfname
msg_system(recdict, cmd)
tmpfile = tempfname()
if os.name == "posix":
tmpexit = tempfname()
if use_tee:
cmd = "{ %s; echo $? > %s; } 2>&1 | tee %s" % (cmd, tmpexit, tmpfile)
else:
cmd = "{ %s; echo $? > %s; } 2>&1 > %s" % (cmd, tmpexit, tmpfile)
else:
if '>' in cmd:
# Redirecting output of a command that redirects output itself only
# works for stderr, but that is not available on MS-Windows.
# Leave the temp file empty.
# TODO: copy the redirected output to tmpfile?
tmpfile = ''
else:
cmd = "%s > %s" % (cmd, tmpfile)
tmpexit = ''
try:
try:
ok = (os.system(cmd) == 0)
except StandardError, e:
msg_note(recdict, (_('Executing "%s" failed: ') % cmd) + str(e))
ok = 0
except KeyboardInterrupt:
msg_note(recdict, _('Executing "%s" interrupted') % cmd)
ok = 0
if tmpexit:
rr = '0'
try:
f = open(tmpexit)
rr = f.read(1)[0]
f.close()
except:
pass
if rr != '0':
ok = 0
# Read the output of the command, also when it failed (might explain
# why it failed).
text = ''
if tmpfile:
try:
f = open(tmpfile)
text = f.read()
f.close()
except StandardError, e:
msg_note(recdict,
(_('Reading output of "%s" failed: ') % cmd) + str(e))
ok = 0
# always remove the tempfiles, even when system() failed.
finally:
if tmpexit:
try_delete(tmpexit)
if tmpfile:
try_delete(tmpfile)
return ok, text
def async_system(rpstack, recdict, cmd):
"""Run command "cmd" with the shell, without waiting for it to finish."""
cmd, adict = get_sys_option(cmd)
if not adict.get("quiet"):
msg_system(recdict, cmd)
try:
if os.name == "posix":
# Unix: use fork/exec
if not os.fork():
n = os.system(cmd)
os._exit(n)
else:
shell = get_shell_name()
os.spawnv(os.P_DETACH, shell, [ "/c", cmd ])
except StandardError, e:
from Process import recipe_error
recipe_error(rpstack, (_('Could not execute "%s": ') % cmd) + str(e))
def get_shell_name():
"""Get the name of the shell to use (either command.com or cmd.exe)."""
if os.environ.has_key("SHELL"):
shell = os.environ["SHELL"]
elif os.environ.has_key("COMSPEC"):
shell = os.environ["COMSPEC"]
else:
from RecPython import program_path
shell = program_path("cmd.exe")
if not shell:
shell = program_path("command.com")
return shell
def get_progname(recdict, varname, progname, args):
"""
Get the program name from variable "varname". If it doesn't exist search
for program "progname" and use arguments "arg".
"""
n = get_var_val(0, recdict, "_no", varname)
if not n:
# Use program_path() so that programs in the "aap" directory are also
# found (useful for automatically installed packages).
from RecPython import program_path
n = program_path(progname)
if not n:
n = progname # not found, use the default.
n = n + args
return n
def assert_aap_dir(recdict):
"""Create the "AAPDIR" directory if it doesn't exist yet.
Return non-zero if it exists or could be created."""
if not os.path.exists(Global.aap_dirname):
try:
os.mkdir(Global.aap_dirname)
except StandardError, e:
msg_error(recdict, (_('Warning: Could not create "%s" directory: ')
% Global.aap_dirname) + str(e))
return 0
return 1
def in_aap_dir(fname):
"""Return the path of file "fname" in the aap directory."""
return os.path.join(Global.aap_dirname, fname)
def skip_commands():
"""Return non-zero when build commands are to be skipped: -n and/or -t
command line argument. But don't skip when force_build was set."""
return (not Global.force_build
and (Global.cmd_args.options.get("touch")
or Global.cmd_args.options.get("nobuild")))
def home_dir():
"""Return a directory to read/write files specifically for the current
user, E.g., ~/.aap."""
if os.name == "posix":
return os.path.expanduser(os.path.join("~", ".aap"))
if os.environ.get("HOME"):
# Use $HOME/aap.
return os.path.join(os.environ.get("HOME"), "aap")
if os.environ.get("HOMEDRIVE"):
# Use $HOMEDRIVE/HOMEPATH/aap.
h = os.environ.get("HOMEDRIVE")
if os.environ.get("HOMEPATH"):
h = os.path.join(h, os.environ.get("HOMEPATH"))
else:
h = h + os.sep
return os.path.join(h, "aap")
if os.path.isdir("c:/"):
# Use C:/aap
return "c:/aap"
# Nothing found...
return ""
def default_dirs(recdict, homedirfirst = 0):
"""
Return a list of directories to look for recipes.
When "homedirfirst" is set put the home dir before a global dir.
"recdict" is used to obtain $AAPSYSDIR. When this variable doesn't exist
fall back to the default directory (for filetype detection).
"""
from Dictlist import str2dictlist
aapsysdir = get_var_val_int(recdict, "AAPSYSDIR")
if aapsysdir is None and os.path.isdir("/usr/local/share/aap"):
aapsysdir = "/usr/local/share/aap"
if aapsysdir:
dirs = map(lambda x: x["name"], str2dictlist([], aapsysdir))
else:
dirs = []
homedir = home_dir()
if homedir:
if homedirfirst:
return [ homedir ] + dirs
return dirs + [ homedir ]
return dirs
def get_import_dirs(recdict):
"""
Return the list of directories from which modules and tools are to be
imported. Without the "modules" or "tools" sub-directory name.
"""
dirs = default_dirs(recdict, homedirfirst = 1)
dirs.append(Global.aap_rootdir)
return dirs
def goto_dir(recdict, dir):
"""Change to directory "dir" and give a message about it."""
if dir != os.getcwd():
# Note: This message is not translated, so that a parser for the
# messages isn't confused by various languages.
os.chdir(dir)
msg_changedir(recdict, os.getcwd())
# Compiling a regexp is slow. This cache keeps the result.
compiled_re = {}
def cre_match(regexp, text):
"""Call re.match while caching the compiled regexp."""
if compiled_re.has_key(regexp):
cre = compiled_re[regexp]
else:
cre = re.compile(regexp)
compiled_re[regexp] = cre
return cre.match(text)
def cre_search(regexp, text):
"""Call re.search while caching the compiled regexp."""
if compiled_re.has_key(regexp):
cre = compiled_re[regexp]
else:
cre = re.compile(regexp)
compiled_re[regexp] = cre
return cre.search(text)
def osname():
"""
Return the os.name in a slightly modified way.
Returns "mswin" for all kinds of MS-Windows.
Returns "posix" for all kinds of Unix.
"""
if os.name == "nt" or os.name == "win32":
return "mswin"
return os.name
asroot_in = None
asroot_out = None
asroot_err = None
def do_as_root(recdict, rpstack, cmd):
"""
Open a connection to a su-shell to execute commands under root
permissions.
Return 1 for success, zero if failure was detected, 2 if the user refused
to execute the command.
"""
global asroot_out, asroot_in, asroot_err
import popenerr
import select
# Ask the user if he really wants to execute this command as root.
# TODO: replace unprintable characters.
msg_info(recdict, _('Directory: "%s"') % os.getcwd())
msg_info(recdict, _('Command: "%s"') % cmd)
r = raw_input(_("Execute as root? y(es)/n(o)/a(bort) "))
if not r or (r[0] != 'y' and r[0] != 'Y'):
if r and (r[0] == 'a' or r[0] == 'A'):
from Process import recipe_error
recipe_error(rpstack, _("Command aborted"))
# Skipping a command is not equal to failure.
msg_info(recdict, _("Command not executed"))
return 2
if asroot_out is None and asroot_in is None:
# Run the shell and get the input and output files.
msg_info(recdict, _("Starting a separate shell to execute commands as root."))
msg_info(recdict, _("For safety you will be asked to confirm each command passed to the shell."))
msg_info(recdict, _("Please enter the root password:"))
asroot_out, asroot_in, asroot_err = popenerr.popen3('su root -c %s'
% os.path.join(Global.aap_rootdir, "RootShell.py"))
# Loop until logging in has been done.
if asroot_err:
fdlist = [asroot_out, asroot_err]
else:
fdlist = [asroot_out]
try:
while 1:
# Read both from stdout and stderr until RootShell has been
# started. "su" may write to stderr.
inp, outp, exc = select.select(fdlist, [], [])
m = inp[0].readline()
if not m:
from Process import recipe_error
recipe_error(rpstack, _('Starting super-user shell failed'))
if string.find(m, "RootShell ready") >= 0:
msg_info(recdict, _("super-user shell started"))
break
msg_info(recdict, m)
except StandardError, e:
from Process import recipe_error
recipe_error(rpstack, _('Error while opening root shell: ') + str(e))
if asroot_out is None or asroot_in is None:
from Process import recipe_error
recipe_error(rpstack, _('Failed to start a root shell'))
# Send the directory and the command to the root shell.
asroot_in.write('C' + os.getcwd() + '\n')
asroot_in.write('X' + cmd + '\n')
asroot_in.flush()
# Loop until the root shell did its work. Echo messages.
# Read one character at a time, otherwise a prompt may not be displayed.
m = ''
while 1:
try:
c = asroot_out.read(1)
except KeyboardInterrupt:
msg_error(recdict, 'Executing interrupted for "%s"' % cmd)
_close_rootshell_fds()
return 0
if not c:
msg_error(recdict, 'Executing aborted for "%s"' % cmd)
_close_rootshell_fds()
return 0
sys.stdout.write(c)
sys.stdout.flush()
if c == '\n':
if m[:13] == "RootShell: OK":
return 1
if m[:9] == "RootShell":
msg_extra(recdict, m)
return 0
# Don't echo here, the write() above already did it.
msg_info({"MESSAGE" : ''}, m)
m = ''
else:
m = m + c
# NOTREACHED
return 0
def close_rootshell(recdict):
"""Close the root shell."""
if asroot_in:
try:
asroot_in.write('Q\n')
asroot_in.flush()
_close_rootshell_fds()
except StandardError, e:
msg_info(recdict, 'Could not close root shell: %s' % str(e))
else:
msg_info(recdict, 'Closed root shell.')
def _close_rootshell_fds():
global asroot_out, asroot_in, asroot_err
asroot_in.close()
asroot_in = None
asroot_out.close()
asroot_out = None
try:
asroot_err.close() # see openerr.py why this fails
except:
pass
asroot_err = None
def try_delete(fname):
"""
Try to delete a file, ignore failure.
"""
try:
os.remove(fname)
except:
pass
# vim: set sw=4 et sts=4 tw=79 fo+=l:
syntax highlighted by Code2HTML, v. 0.9.1