# Part of the A-A-P recipe executive: Utility functions # Copyright (C) 2002-2003 Stichting NLnet Labs # Permission to copy and use this file is specified in the file COPYING. # If this file is missing you can find it here: http://www.a-a-p.org/COPYING # # Util: utility functions # # It's OK to do "from Util import *", these things are supposed to be global. # import string import os.path import types import sys import import_re # import the re module in a special way import glob from Error import * from Message import * def i18n_init(): """Set up Internationalisation: setlocale() and gettext().""" # Chicken-egg problem: Should give informational messages here, but since # the options haven't been parsed yet we don't know if the user wants us to # be verbose. Let's keep quiet. # If already done return quickly. import __builtin__ if __builtin__.__dict__.has_key("_"): return # Set the locale to the users default. try: import locale try: locale.setlocale(locale.LC_ALL, '') except locale.Error: pass # there is a locale module but it doesn't work except ImportError: pass # locale module cannot be imported # Set up for translating messages, if possible. # When the gettext module is missing this results in an ImportError. # An older version of gettext doesn't support install(), it generates an # AttributeError. # An IOError is generated when the module "a-a-p" cannot be found. # If not possible, define the _() and N_() functions to do nothing. # Make them builtin so that they are available everywhere. try: import gettext gettext.install("a-a-p") except (ImportError, AttributeError, IOError): def nogettext(s): return s __builtin__.__dict__['_'] = nogettext __builtin__.__dict__['N_'] = nogettext def is_white(c): """Return 1 if "c" is a space or a Tab.""" return c == ' ' or c == '\t' def skip_white(line, i): """Skip whitespace, starting at line[i]. Return the index of the next non-white character, or past the end of "line".""" try: while line[i] == ' ' or line[i] == '\t': i = i + 1 except IndexError: pass return i def skip_to_white(line, i): """Skip non-whitespace, starting at line[i]. Return the index of the next white character, or past the end of "line".""" try: while line[i] != ' ' and line[i] != '\t': i = i + 1 except IndexError: pass return i def get_token(arg, i): """Get one white-space separated token from arg[i:]. Handles single and double quotes and keeps them (see get_item() to remove quotes). A sequence of white space is also a token. Returns the token and the index after it.""" # If "arg" starts with white space, return the span of white space. if is_white(arg[i]): e = skip_white(arg, i) return arg[i:e], e # Isolate the token until white space or end of the argument. inquote = '' arg_len = len(arg) token = '' while i < arg_len: if inquote: if arg[i] == inquote: inquote = '' elif arg[i] == "'" or arg[i] == '"': inquote = arg[i] elif is_white(arg[i]): break token = token + arg[i] i = i + 1 return token, i def list2string(lines): """Convert a list of strings into a single string.""" # The simple but slow method: # return reduce(lambda a, b: a + b, lines) # The clumsy but fast method: import cStringIO s = cStringIO.StringIO() for line in lines: s.write(line) return s.getvalue() def check_exists(rpstack, fname): """Give an error message if file "fname" already exists.""" if os.path.exists(fname): from Process import recipe_error recipe_error(rpstack, _('File already exists: "%s"') % fname) def scopechar(c): """Return 1 when "c" is a scope name character, 0 otherwise.""" return c in string.digits or c in string.letters or c == "_" def varchar(c): """Return 1 when "c" is a variable name character, 0 otherwise.""" return c in string.digits or c in string.letters or c == "_" or c == "." def skip_varchars(arg, idx): """Skip over varable name chars in arg[idx:], return the index.""" e = idx try: while 1: c = arg[e] if not (c in string.digits or c in string.letters or c == "_" or c == "."): break e = e + 1 except: pass return e def unquote(recdict, str): """Remove quotes from "str". Assumes aap style quoting.""" res = '' inquote = '' for c in str: if c == inquote: inquote = '' # End of quoted text. elif not inquote and (c == '"' or c == "'"): inquote = c # Start of quoted text. else: res = res + c if inquote: msg_info(recdict, _('Missing quote in "%s"') % str) return res def enquote(s, quote = '"'): """Put quotes around "s" so that it is handled as one item. Uses aap style quoting (a mix of single and double quotes).""" result = quote slen = len(s) i = 0 while i < slen: if s[i] == quote: if quote == '"': result = result + "'\"'\"'" else: result = result + '"\'"\'"' else: result = result + s[i] i = i + 1 return result + quote def double_quote(str): """Put double quotes around "str" when it contains white space or a quote. Contained double quotes are doubled.""" quote = '' for c in str: if c == "'" or c == '"' or is_white(c): quote = '"' break if not quote: return str # no quoting required res = quote for c in str: if c == '"': res = res + '"' res = res + c return res + quote def bs_quote(str): """Escape special characters in "str" with a backslash. Special characters are white space, quotes and backslashes.""" res = '' for c in str: if c == '\\' or c == "'" or c == '"' or is_white(c): res = res + '\\' res = res + c return res def get_indent(line): """Count the number of columns of indent at the start of "line".""" i = 0 col = 0 try: while 1: if line[i] == ' ': col = col + 1 elif line[i] == '\t': col = col + 8 - (col % 8) else: break i = i + 1 except IndexError: pass return col def append2var(recdict, scope, varname, str): """Add "str" the variable "scope.varname" for "recdict".""" v = recdict[scope].get(varname) if v: if isinstance(v, ExpandVar): recdict[scope][varname] = ExpandVar(v.getVal() + ' ' + str) else: recdict[scope][varname] = v + ' ' + str else: recdict[scope][varname] = str def add_cleanfiles(recdict, str): """Add "str" the "CLEANFILES" for the recipe with "recdict".""" append2var(recdict, "_recipe", "CLEANFILES", str) def add_distfiles(recdict, str): """Add "str" the "DISTFILES" for the recipe with "recdict".""" append2var(recdict, "_recipe", "DISTFILES", str) def rem_dup(var): """Remove duplicates from a variable string. Attributes are ignored when comparing items.""" from Dictlist import dictlist2str, str2dictlist expand = isinstance(var, ExpandVar) if expand: val = var.getVal() else: val = var ndl = [] for d in str2dictlist([], val): dup = 0 for nd in ndl: if d["name"] == nd["name"]: dup = 1 break if not dup: ndl.append(d) s = dictlist2str(ndl) if expand: return ExpandVar(s) return s class Expand: """Kind of expansion used for $VAR.""" quote_none = 0 # no quoting quote_aap = 1 # quoting with " and ' quote_double = 2 # quoting with ", backslash for escaping quote_bs = 3 # escaping with backslash quote_shell = 4 # quoting with backslash or " for the shell def __init__(self, attr = 1, quote = quote_aap, skip_errors = 0): self.attr = attr # include attributes self.quote = quote # quoting with " and ' self.rcstyle = 0 # rc-style expansion self.skip_errors = skip_errors # ignore errors self.optional = 0 # empty when not defined self.slash = 0 # no / to \ replacement class ExpandVar: """ Class to hold a variable value that needs to be expanded when it is used. """ def __init__(self, val): self.val = val def getVal(self): return self.val def __add__(self, other): "Concatenate ExpandVar like a string. Equivalent to $+= in aap." return ExpandVar(self.getVal() + other) def __str__(self): "Expand the value" from Commands import expand return expand(0, Global.globals, self.getVal(), Expand(1, Expand.quote_aap)) def get_var_val_int(recdict, name): """ Get variable for internal use. Get it from "recdict" directly or from the "_up" scope. Do not use the "_no" scope to avoid setting the flag that the variable was read before set. """ if recdict.has_key(name): return recdict[name] return get_var_val(0, recdict, "_up", name) def get_var_val(line_nr, recdict, scope, name, argexpand = None): """ Get the value of variable "name", expanding it when postponed evaluation was specified for the assignment. Use "scope" if it is set. "_no" can be used to search up the stack. When the variable "name" does not exist return None. """ from Commands import expand if not scope: val = recdict.get(name) else: try: val = recdict[scope][name] except: val = None # either the scope or the variable doesn't exist if val is None: return None # The variable may have been set by a Python statement, using a type # different from the string we expect. # Automatically convert a number to a string. if isinstance(val, types.IntType) or isinstance(val, types.LongType): val = str(val) if isinstance(val, ExpandVar): val = expand(line_nr, recdict, val.getVal(), Expand(1, Expand.quote_aap)) # Return if no expansion to be done. if not argexpand: return val # $/var: change all slashes to backslashes. if argexpand.slash: val = string.replace(val, '/', '\\') # Return when expanding doesn't change the value: # - Include attributes and aap quoting is the default. # - There are no attributes and aap quoting. if argexpand.quote == Expand.quote_aap and (argexpand.attr or not '{' in val): return val # Remove attributes and/or change the quoting. This is done by turning the # string into a dictlist and then back into a string. # Be permissive about errors and keep whitespace when there are no # attributes are to be removed. from Dictlist import dictlist2str, str2dictlist try: dl = str2dictlist([], val) if argexpand.quote != Expand.quote_aap: use_val = 0 else: use_val = 1 for d in dl: if len(d.keys()) > 1: use_val = 0 break if use_val: res = val else: res = dictlist2str(dl, argexpand) except UserError, e: res = val # ignore the error, return unexpanded if not argexpand.skip_errors: from Work import getrpstack msg_warning(recdict, (_('Error expanding "%s": ') % val) + str(e), rpstack = getrpstack(recdict, line_nr)) return res def expand_item(item, argexpand, key = "name"): """Expand one "item" (one entry of a variable converted to a dictlist), according to "argexpand".""" res = expand_itemstr(item[key], argexpand) if argexpand.attr: from Dictlist import dictlistattr2str res = res + dictlistattr2str(item) return res def expand_itemstr(str, argexpand): """Expand the string value of an item accoding to "argexpand".""" if argexpand.quote == Expand.quote_shell: if os.name == "posix": # On Unix a mix of double and single quotes works well # Also escape characters that have a special meaning. from Dictlist import listitem2str res = listitem2str(str, ' \t&;|$<>', '&;|') else: # On MS-Windows double quotes works well res = double_quote(str) else: if argexpand.quote == Expand.quote_none: res = str elif argexpand.quote == Expand.quote_aap: from Dictlist import listitem2str res = listitem2str(str) elif argexpand.quote == Expand.quote_double: res = double_quote(str) else: res = bs_quote(str) return res def has_wildcard(s): """ Return non-zero when "s" contains '*', '?' or '[abc]'. """ return re.search("[*?[]", s) != None def expand_unescape(s): """ Remove wildcard escaping from "s". Currenly only changes "[*]" things to "*". """ # Be quick when nothing to be done. ni = string.find(s, '[') if ni < 0: return s res = "" i = 0 while 1: res = res + s[i:ni] if s[ni + 2] == ']': res = res + s[ni + 1] i = ni + 3 else: res = res + '[' i = ni + 1 if i >= len(s): break ni = string.find(s, '[', i) if ni < 0 or ni + 2 >= len(s): return res + s[i:] return res def oct2int(s): """convert string "s", which is an octal number, to an int. Isn't there a standard Python function for this?""" v = 0 for c in s: if not c in string.octdigits: raise UserError, _('non-octal chacacter encountered in "%s"') % s v = v * 8 + int(c) return v def full_fname(name): """Make a full, uniform file name out of "name". Used to be able to compare filenames with "./" and "../" things in them, also after changing directories.""" res = os.path.abspath(os.path.normpath(name)) if os.name == "posix": return res return fname_fold(res) def fname_fold(name): """ Turn a file name into a uniform format: on non-Unix systems change backslashes to slashes and make all characters lowercase. Cannot use os.path.normcase() and os.path.normpath() here, they change slashes to backslashes on MS-Windows. """ if os.name == "posix": return name return string.replace(string.lower(name), '\\', '/') def fname_equal(one, two): """ Return non-zero when "one" and "two" are the same file (or directory). """ if os.name == "posix": # Use samefile() to detect symbolic links. But this only works for # existing files, thus compare the names if it fails. And it's not # supported on all systems. try: r = os.path.samefile(one, two) except: r = 0 if r: return r return fname_fold(one) == fname_fold(two) def fname_fold_same(): """Return if fname_fold() always returns a file name unmodified.""" return os.name == "posix" def path_join(first, second): """Special version of os.path.join() that uses the path separator that is being used already.""" c = os.sep if string.find(first, '/') >= 0 and os.sep == '\\': os.sep = '/' ret = os.path.join(first, second) os.sep = c return ret def shorten_name(name, dir = None): """Shorten a file name when it's relative to directory "dir". If "dir" is not given, use the current directory. Prefers using "../" when part of "dir" matches. Also handles backslashes on non-posix systems""" from Remote import is_url if is_url(name): return name # can't shorten a URL adir = dir if adir is None: adir = os.getcwd() dir_len = len(adir) c = adir[dir_len - 1] if c != '/' and c != '\\': adir = adir + os.sep # make sure "adir" ends in a slash dir_len = dir_len + 1 # Skip over the path components that are equal name_len = len(name) i = 0 slash = -1 while i < dir_len and i < name_len: if os.name == "posix": if adir[i] != name[i]: break if adir[i] == '/': slash = i else: if ((adir[i] == '/' or adir[i] == '\\') and (name[i] == '/' or name[i] == '\\')): slash = i elif string.lower(adir[i]) != string.lower(name[i]): break i = i + 1 # For a full match and "name" is a directory pretend "name" ends in a # slash. if (i == name_len and name_len > 0 and i < dir_len and (adir[i] == '/' or adir[i] == '\\') and name[-1] != '/' and name[-1] != '\\' and os.path.isdir(name)): slash = i i = i + 1 # If nothing is equal, return the full name if slash <= 0: return name # For a full match with "adir" return the name without it. if i == dir_len: return name[dir_len:] # Insert "../" for the components in "adir" that are not equal. # Example: name = "/foo/bdir/foo.o" # adir = "/foo/test" # result = "../bdir/foo.o" back = '' while i < dir_len: if adir[i] == '/' or adir[i] == '\\': back = back + ".." + os.sep i = i + 1 # Don't include a trailing slash when "name" is part of "adir". if slash + 1 >= name_len and back: return back[:-1] return back + name[slash + 1:] def display_name(name): """Change a file name into a form suitable for displaying: fname.txt (/path/dir/) """ absname = full_fname(name) return os.path.basename(absname) + " (" + os.path.dirname(absname) + ")" def shorten_dictlist(dictlist): """Shorten a dictlist to the current directory. Returns a copy of the dictlist with identical attributes and shortened names.""" adir = os.getcwd() newlist = [] for item in dictlist: new_item = {} for k in item.keys(): if k == "name": if item.has_key("_node") and k == "name": new_item[k] = shorten_name(item["_node"].get_name(), adir) else: new_item[k] = shorten_name(item[k], adir) else: new_item[k] = item[k] newlist.append(new_item) return newlist def aap_checkdir(rpstack, recdict, fname): """Make sure the directory for file "fname" exists.""" dirname = os.path.dirname(fname) if dirname: assert_dir(rpstack, recdict, dirname) def assert_dir(rpstack, recdict, dirname): """Make sure the directory "dirname" exists.""" if not os.path.exists(dirname): msg_info(recdict, _('Creating directory "%s"') % dirname) try: os.makedirs(dirname) except StandardError, e: from Process import recipe_error recipe_error(rpstack, (_('Cannot create directory "%s": ') % dirname) + str(e)) elif not os.path.isdir(dirname): from Process import recipe_error recipe_error(rpstack, _('"%s" exists but is not a directory') % dirname) def dir_contents(dir, recurse = 0, join = 1): """Return a list with the contents of directory "dir". Takes care not to expand wildcards in "dir". When "recurse" is non-zero, recursively get contents of directories. When "join" is zero, don't join "dir" to the resulting list. Raises an Exception when something is wrong.""" cwd = os.getcwd() os.chdir(dir) try: # Get contents of "dir". alist = glob.glob("*") if os.name == "posix": alist = alist + glob.glob(".*") # When recursive, get contents of each directory in "dir". if recurse: newlist = [] for item in alist: if os.path.isdir(item): newlist.extend(dir_contents(item, 1)) else: newlist.append(item) alist = newlist finally: os.chdir(cwd) if join: newlist = [] for l in alist: newlist.append(os.path.join(dir, l)) return newlist return alist def deltree(dir): """ Recursively delete a directory or a file. But for a symlink delete the link itself, not a directory it's pointing to. """ if os.path.isdir(dir) and not isalink(dir): for f in dir_contents(dir): deltree(f) os.rmdir(dir) else: os.remove(dir) def isalink(fname): """ Version of os os.readlink() that returns False on non-Unix systems. """ islink = 0 try: islink = os.readlink(fname) except: # Ignore errors, doesn't work on non-Unix systems. pass return islink def date2secs(str): """Convert a string like "12 days" to a number of seconds.""" str_len = len(str) i = 0 while i < str_len: if not str[i] in string.digits: break i = i + 1 if i == 0: raise UserError, _('Must start with a number: "%s"') % str nr = int(str[:i]) i = skip_white(str, i) if str[i:] == "day": return nr * 24 * 60 * 60 if str[i:] == "hour": return nr * 60 * 60 if str[i:] == "min": return nr * 60 if str[i:] == "sec": return nr raise UserError, _('Must have day, hour, min or sec: "%s"') % str def dictlist_sameentries(one, two): """ Compare the entries in dictlists "one" and "two" and return non-zero if they contain the same entries. Only checks the "name" of the entries and ignores the order. """ if len(one) != len(two): return 0 for o in one: found = 0 for t in two: if o["name"] == t["name"]: found = 1 if not found: return 0 return 1 def cflags_normal(): """ Return $OPTIMIZE and $DEBUG in a form that works for most C compilers. """ recdict = Global.globals opt = get_var_val(0, recdict, "_no", "OPTIMIZE") if opt: if opt == "size": opt = 's' else: try: opt = int(opt) except: msg_warning(recdict, _("OPTIMIZE must be 'size' or a number: %s") % opt) opt = 0 if opt > 0: opt = "%d" % opt else: opt = None dbg = get_var_val(0, recdict, "_no", "DEBUG") if dbg == "no": dbg = None if opt and dbg: # TODO: check if the compiler accepts both arguments res = "-O%s -g" % opt elif opt: res = "-O%s" % opt elif dbg: res = "-g" else: res = '' return res def get_sys_option(cmd): """Check for system command options. Ignore anything that is not a valid option. Return the remaining command and a dictionary of the options.""" from Dictlist import parse_attr cmd_len = len(cmd) idx = 0 adict = {} while 1: idx = skip_white(cmd, idx) if idx >= cmd_len or cmd[idx] != '{': break try: name, val, idx = parse_attr([], cmd, idx) except UserError: # An error probably means the shell command starts with "{". name = None if name == 'q' or name == 'quiet': adict['quiet'] = val elif name == 'i' or name == 'interactive': adict['interactive'] = val elif name == 'l' or name == 'log': adict['log'] = val elif name == 'f' or name == 'force': adict['force'] = val else: break return cmd[idx:], adict def logged_system(recdict, cmd): """Execute a system command. Display the command and log the output. Returns the return value of os.system().""" # Redirect the output of each line to a file. # Don't do this for lines that contain redirection themselves. # TODO: make this work on non-Posix systems. tmpfilex = '' if msg_logname(): from RecPython import tempfname tmpfile = tempfname() if os.name == "posix": tmpfilex = tempfname() else: tmpfile = None newcmd = '' append = "" for line in string.split(cmd, '\n'): if not line: continue # skip empty lines rest, adict = get_sys_option(line) if adict.get("quiet"): msg_log(recdict, line) else: msg_system(recdict, line) if adict.get("interactive"): newcmd = newcmd + rest + '\n' elif adict.get("log") or Global.sys_cmd_log: rd = ">" if not tmpfile: # not logging, throw it in the bit bucket of = "/dev/null" else: of = tmpfile # write or append to the logfile if append: rd = ">>" if os.name == "posix": # The extra "2>&1" just after the command is unexpectedly # needed to redirect stderr, otherwise it's echoed. newcmd = newcmd + ("{ %s 2>&1; echo $? > %s; } 2>&1 %s%s\n" % (rest, tmpfilex, rd, of)) else: if of == "/dev/null": newcmd = newcmd + ("%s\n" % (rest)) else: newcmd = newcmd + ("%s %s%s\n" % (rest, rd, of)) append = "-a " # append next command output elif tmpfile and string.find(rest, '>') < 0: if os.name == "posix": newcmd = newcmd + ("{ %s; echo $? > %s; } 2>&1 | tee %s%s\n" % (rest, tmpfilex, append, tmpfile)) else: rd = ">" if append: rd = ">>" newcmd = newcmd + ("%s %s%s\n" % (rest, rd, tmpfile)) append = "-a " # append next command output else: newcmd = newcmd + rest + '\n' try: if os.name == "posix": res = os.system(newcmd) else: # system() on MS-Windows can handle only one command at a time and # must not end in a NL. Do the same on other non-Unix systems for # now. # TODO: system() isn't available on the Mac # TODO: system() always returns zero for Windows 9x for line in string.split(newcmd, '\n'): if line: # skip empty lines if osname() == "mswin": # Hack: os.system() doesn't always handle command names # with a space in them or with double quotes. Write # the command in a batch file and execute it. tmpbat = tempfname() + ".bat" f = open(tmpbat, "w") f.write("@echo off\r\n") f.write(line + "\r\n") f.close() line = tmpbat try: res = os.system(line) finally: if osname() == "mswin": os.unlink(line) if res: break except KeyboardInterrupt: msg_info(recdict, _("Interrupted")) res = 1 if tmpfile: # Read the file that contains the exit status of the executed command. # "res" could be the exit status of "tee". if os.name == "posix": rr = '0' try: f = open(tmpfilex) rr = f.read(10) f.close() except: pass # Be prepared for strange results: Only use the number at the start # of the file. Avoids crashing here. rr = re.sub("^(\\d*)(.|\\s)*", "\\1", rr) if rr and rr != '0': res = int(rr) if res == 0: res = 1 # Append the output to the logfile. try: f = open(tmpfile) text = f.read() f.close() except: text = '' if text: if Global.sys_cmd_log: Global.sys_cmd_log = Global.sys_cmd_log + text else: msg_log(recdict, text) if os.name != "posix" or sys.stdout != sys.__stdout__: # No "tee" command or stdout is going somehwere else, echo # the system command output. print text # Always delete the temp files try_delete(tmpfile) if tmpfilex: try_delete(tmpfilex) return res def redir_system_int(recdict, cmd, use_tee = 1): """Execute "cmd" with the shell and catch the output. Returns two things: a number (non-zero for success) and the output of the command. The caller may want to use msg_log() for the text. """ from RecPython import tempfname msg_system(recdict, cmd) tmpfile = tempfname() if os.name == "posix": tmpexit = tempfname() if use_tee: cmd = "{ %s; echo $? > %s; } 2>&1 | tee %s" % (cmd, tmpexit, tmpfile) else: cmd = "{ %s; echo $? > %s; } 2>&1 > %s" % (cmd, tmpexit, tmpfile) else: if '>' in cmd: # Redirecting output of a command that redirects output itself only # works for stderr, but that is not available on MS-Windows. # Leave the temp file empty. # TODO: copy the redirected output to tmpfile? tmpfile = '' else: cmd = "%s > %s" % (cmd, tmpfile) tmpexit = '' try: try: ok = (os.system(cmd) == 0) except StandardError, e: msg_note(recdict, (_('Executing "%s" failed: ') % cmd) + str(e)) ok = 0 except KeyboardInterrupt: msg_note(recdict, _('Executing "%s" interrupted') % cmd) ok = 0 if tmpexit: rr = '0' try: f = open(tmpexit) rr = f.read(1)[0] f.close() except: pass if rr != '0': ok = 0 # Read the output of the command, also when it failed (might explain # why it failed). text = '' if tmpfile: try: f = open(tmpfile) text = f.read() f.close() except StandardError, e: msg_note(recdict, (_('Reading output of "%s" failed: ') % cmd) + str(e)) ok = 0 # always remove the tempfiles, even when system() failed. finally: if tmpexit: try_delete(tmpexit) if tmpfile: try_delete(tmpfile) return ok, text def async_system(rpstack, recdict, cmd): """Run command "cmd" with the shell, without waiting for it to finish.""" cmd, adict = get_sys_option(cmd) if not adict.get("quiet"): msg_system(recdict, cmd) try: if os.name == "posix": # Unix: use fork/exec if not os.fork(): n = os.system(cmd) os._exit(n) else: shell = get_shell_name() os.spawnv(os.P_DETACH, shell, [ "/c", cmd ]) except StandardError, e: from Process import recipe_error recipe_error(rpstack, (_('Could not execute "%s": ') % cmd) + str(e)) def get_shell_name(): """Get the name of the shell to use (either command.com or cmd.exe).""" if os.environ.has_key("SHELL"): shell = os.environ["SHELL"] elif os.environ.has_key("COMSPEC"): shell = os.environ["COMSPEC"] else: from RecPython import program_path shell = program_path("cmd.exe") if not shell: shell = program_path("command.com") return shell def get_progname(recdict, varname, progname, args): """ Get the program name from variable "varname". If it doesn't exist search for program "progname" and use arguments "arg". """ n = get_var_val(0, recdict, "_no", varname) if not n: # Use program_path() so that programs in the "aap" directory are also # found (useful for automatically installed packages). from RecPython import program_path n = program_path(progname) if not n: n = progname # not found, use the default. n = n + args return n def assert_aap_dir(recdict): """Create the "AAPDIR" directory if it doesn't exist yet. Return non-zero if it exists or could be created.""" if not os.path.exists(Global.aap_dirname): try: os.mkdir(Global.aap_dirname) except StandardError, e: msg_error(recdict, (_('Warning: Could not create "%s" directory: ') % Global.aap_dirname) + str(e)) return 0 return 1 def in_aap_dir(fname): """Return the path of file "fname" in the aap directory.""" return os.path.join(Global.aap_dirname, fname) def skip_commands(): """Return non-zero when build commands are to be skipped: -n and/or -t command line argument. But don't skip when force_build was set.""" return (not Global.force_build and (Global.cmd_args.options.get("touch") or Global.cmd_args.options.get("nobuild"))) def home_dir(): """Return a directory to read/write files specifically for the current user, E.g., ~/.aap.""" if os.name == "posix": return os.path.expanduser(os.path.join("~", ".aap")) if os.environ.get("HOME"): # Use $HOME/aap. return os.path.join(os.environ.get("HOME"), "aap") if os.environ.get("HOMEDRIVE"): # Use $HOMEDRIVE/HOMEPATH/aap. h = os.environ.get("HOMEDRIVE") if os.environ.get("HOMEPATH"): h = os.path.join(h, os.environ.get("HOMEPATH")) else: h = h + os.sep return os.path.join(h, "aap") if os.path.isdir("c:/"): # Use C:/aap return "c:/aap" # Nothing found... return "" def default_dirs(recdict, homedirfirst = 0): """ Return a list of directories to look for recipes. When "homedirfirst" is set put the home dir before a global dir. "recdict" is used to obtain $AAPSYSDIR. When this variable doesn't exist fall back to the default directory (for filetype detection). """ from Dictlist import str2dictlist aapsysdir = get_var_val_int(recdict, "AAPSYSDIR") if aapsysdir is None and os.path.isdir("/usr/local/share/aap"): aapsysdir = "/usr/local/share/aap" if aapsysdir: dirs = map(lambda x: x["name"], str2dictlist([], aapsysdir)) else: dirs = [] homedir = home_dir() if homedir: if homedirfirst: return [ homedir ] + dirs return dirs + [ homedir ] return dirs def get_import_dirs(recdict): """ Return the list of directories from which modules and tools are to be imported. Without the "modules" or "tools" sub-directory name. """ dirs = default_dirs(recdict, homedirfirst = 1) dirs.append(Global.aap_rootdir) return dirs def goto_dir(recdict, dir): """Change to directory "dir" and give a message about it.""" if dir != os.getcwd(): # Note: This message is not translated, so that a parser for the # messages isn't confused by various languages. os.chdir(dir) msg_changedir(recdict, os.getcwd()) # Compiling a regexp is slow. This cache keeps the result. compiled_re = {} def cre_match(regexp, text): """Call re.match while caching the compiled regexp.""" if compiled_re.has_key(regexp): cre = compiled_re[regexp] else: cre = re.compile(regexp) compiled_re[regexp] = cre return cre.match(text) def cre_search(regexp, text): """Call re.search while caching the compiled regexp.""" if compiled_re.has_key(regexp): cre = compiled_re[regexp] else: cre = re.compile(regexp) compiled_re[regexp] = cre return cre.search(text) def osname(): """ Return the os.name in a slightly modified way. Returns "mswin" for all kinds of MS-Windows. Returns "posix" for all kinds of Unix. """ if os.name == "nt" or os.name == "win32": return "mswin" return os.name asroot_in = None asroot_out = None asroot_err = None def do_as_root(recdict, rpstack, cmd): """ Open a connection to a su-shell to execute commands under root permissions. Return 1 for success, zero if failure was detected, 2 if the user refused to execute the command. """ global asroot_out, asroot_in, asroot_err import popenerr import select # Ask the user if he really wants to execute this command as root. # TODO: replace unprintable characters. msg_info(recdict, _('Directory: "%s"') % os.getcwd()) msg_info(recdict, _('Command: "%s"') % cmd) r = raw_input(_("Execute as root? y(es)/n(o)/a(bort) ")) if not r or (r[0] != 'y' and r[0] != 'Y'): if r and (r[0] == 'a' or r[0] == 'A'): from Process import recipe_error recipe_error(rpstack, _("Command aborted")) # Skipping a command is not equal to failure. msg_info(recdict, _("Command not executed")) return 2 if asroot_out is None and asroot_in is None: # Run the shell and get the input and output files. msg_info(recdict, _("Starting a separate shell to execute commands as root.")) msg_info(recdict, _("For safety you will be asked to confirm each command passed to the shell.")) msg_info(recdict, _("Please enter the root password:")) asroot_out, asroot_in, asroot_err = popenerr.popen3('su root -c %s' % os.path.join(Global.aap_rootdir, "RootShell.py")) # Loop until logging in has been done. if asroot_err: fdlist = [asroot_out, asroot_err] else: fdlist = [asroot_out] try: while 1: # Read both from stdout and stderr until RootShell has been # started. "su" may write to stderr. inp, outp, exc = select.select(fdlist, [], []) m = inp[0].readline() if not m: from Process import recipe_error recipe_error(rpstack, _('Starting super-user shell failed')) if string.find(m, "RootShell ready") >= 0: msg_info(recdict, _("super-user shell started")) break msg_info(recdict, m) except StandardError, e: from Process import recipe_error recipe_error(rpstack, _('Error while opening root shell: ') + str(e)) if asroot_out is None or asroot_in is None: from Process import recipe_error recipe_error(rpstack, _('Failed to start a root shell')) # Send the directory and the command to the root shell. asroot_in.write('C' + os.getcwd() + '\n') asroot_in.write('X' + cmd + '\n') asroot_in.flush() # Loop until the root shell did its work. Echo messages. # Read one character at a time, otherwise a prompt may not be displayed. m = '' while 1: try: c = asroot_out.read(1) except KeyboardInterrupt: msg_error(recdict, 'Executing interrupted for "%s"' % cmd) _close_rootshell_fds() return 0 if not c: msg_error(recdict, 'Executing aborted for "%s"' % cmd) _close_rootshell_fds() return 0 sys.stdout.write(c) sys.stdout.flush() if c == '\n': if m[:13] == "RootShell: OK": return 1 if m[:9] == "RootShell": msg_extra(recdict, m) return 0 # Don't echo here, the write() above already did it. msg_info({"MESSAGE" : ''}, m) m = '' else: m = m + c # NOTREACHED return 0 def close_rootshell(recdict): """Close the root shell.""" if asroot_in: try: asroot_in.write('Q\n') asroot_in.flush() _close_rootshell_fds() except StandardError, e: msg_info(recdict, 'Could not close root shell: %s' % str(e)) else: msg_info(recdict, 'Closed root shell.') def _close_rootshell_fds(): global asroot_out, asroot_in, asroot_err asroot_in.close() asroot_in = None asroot_out.close() asroot_out = None try: asroot_err.close() # see openerr.py why this fails except: pass asroot_err = None def try_delete(fname): """ Try to delete a file, ignore failure. """ try: os.remove(fname) except: pass # vim: set sw=4 et sts=4 tw=79 fo+=l: