# $Id: cgi.py,v 1.31 2004/03/24 12:14:31 jribbens Exp $ import sys, re, os, Cookie, errno try: import cStringIO as StringIO except ImportError: import StringIO """Object-oriented CGI interface.""" class Error(Exception): """The base class for all exceptions thrown by this module.""" pass class SequencingError(Error): """The exception thrown when functions are called out of order.""" """ For example, if you try to call a function altering the headers of your output when the headers have already been sent. """ pass _url_encre = re.compile(r"[^A-Za-z0-9_.!~*()-]") # RFC 2396 section 2.3 _url_decre = re.compile(r"%([0-9A-Fa-f]{2})") _html_encre = re.compile("[&<>\"'+]") # '+' is encoded because it is special in UTF-7, which the browser may select # automatically if the content-type header does not specify the character # encoding. This is paranoia and is not bulletproof, but it does no harm. See # section 4 of www.microsoft.com/technet/security/news/csoverv.mspx _html_encodes = { "&": "&", "<": "<", ">": ">", "\"": """, "'": "'", "+": "+" } def html_encode(raw): """Return the string parameter HTML-encoded.""" """ Specifically, the following characters are encoded as entities: & < > " ' + """ if not isinstance(raw, unicode): raw = str(raw) return re.sub(_html_encre, lambda m: _html_encodes[m.group(0)], raw) def url_encode(raw): """Return the string parameter URL-encoded.""" if not isinstance(raw, unicode): raw = str(raw) return re.sub(_url_encre, lambda m: "%%%02X" % ord(m.group(0)), raw) def url_decode(enc): """Return the string parameter URL-decoded (including '+' -> ' ').""" s = enc.replace("+", " ") return re.sub(_url_decre, lambda m: chr(int(m.group(1), 16)), s) __UNDEF__ = [] def _lookup(name, frame, locls): if name in locls: return "local", locls[name] if name in frame.f_globals: return "global", frame.f_globals[name] if "__builtins__" in frame.f_globals and \ hasattr(frame.f_globals["__builtins__"], name): return "builtin", getattr(frame.f_globals["__builtins__"], name) return None, __UNDEF__ def _scanvars(reader, frame, locls): import tokenize, keyword vrs = [] lasttoken = None parent = None prefix = "" for ttype, token, start, end, line in tokenize.generate_tokens(reader): if ttype == tokenize.NEWLINE: break elif ttype == tokenize.NAME and token not in keyword.kwlist: if lasttoken == ".": if parent is not __UNDEF__: value = getattr(parent, token, __UNDEF__) vrs.append((prefix + token, prefix, value)) else: (where, value) = _lookup(token, frame, locls) vrs.append((token, where, value)) elif token == ".": prefix += lasttoken + "." parent = value else: parent = None prefix = "" lasttoken = token return vrs def _tb_encode(s): return html_encode(s).replace(" ", " ") def traceback(req, html=0): import traceback, time, types, linecache, inspect, repr repr = repr.Repr() repr.maxdict = 10 repr.maxlist = 10 repr.maxtuple = 10 repr.maxother = 200 repr.maxstring = 200 repr = repr.repr (etype, evalue, etb) = sys.exc_info() if type(etype) is types.ClassType: etype = etype.__name__ if html: try: req.clear_headers() req.clear_output() req.set_header("Content-Type", "text/html; charset=iso-8859-1") except SequencingError: req.write("" "") req.write("""\ jonpy traceback: %s
 
%s
%s
%s

A problem occurred in a Python script. Here is the sequence of function calls leading up to the error, with the most recent first.

""" % (_tb_encode(etype), _tb_encode(etype), "Python %s: %s" % (sys.version.split()[0], sys.executable), time.ctime(time.time()))) req.error("jonpy error: %s at %s\n" % (etype, time.ctime(time.time()))) # this code adapted from the standard cgitb module # unfortunately we cannot use that module directly, # mainly because it won't allow us to output to the log if html: req.write("

%s: %s" % (_tb_encode(etype), _tb_encode(evalue))) req.error("%s: %s\n" % (etype, evalue)) #if type(evalue) is types.InstanceType: # for name in dir(evalue): # if html: # req.write("\n
    %s = %s" % # (_tb_encode(name), _tb_encode(repr(getattr(evalue, name))))) # req.error(" %s = %s\n" % (name, repr(getattr(evalue, name)))) if html: req.write("

\n") frames = [] records = inspect.getinnerframes(etb, 7) records.reverse() for frame, fn, lnum, func, lines, index in records: if html: req.write("""\ """) fn = fn and os.path.abspath(fn) or "?" args, varargs, varkw, locls = inspect.getargvalues(frame) if func != "?": fav = inspect.formatargvalues(args, varargs, varkw, locls) if html: req.write("" "\n" % (_tb_encode(fn), _tb_encode(func), _tb_encode(fav))) req.error("%s in %s%s\n" % (fn, func, fav)) else: if html: req.write("\n" % (_tb_encode(fn),)) req.error("%s\n" % (fn,)) highlight = {} def reader(lnum=[lnum]): highlight[lnum[0]] = 1 try: return linecache.getline(fn, lnum[0]) finally: lnum[0] += 1 vrs = _scanvars(reader, frame, locls) if index is not None: i = lnum - index for line in lines: if html: if i in highlight: style = "tb_codehigh" else: style = "tb_code" req.write("\n" % (style, " " * (5 - len(str(i))) + str(i), _tb_encode(line))) req.error("%s %s" % (" " * (5-len(str(i))) + str(i), line)) i += 1 done = {} dump = [] htdump = [] for name, where, value in vrs: if name in done: continue done[name] = 1 if value is not __UNDEF__: if where == "global": dump.append("global %s = %s" % (name, repr(value))) htdump.append("global%s = %s" % (_tb_encode(name), _tb_encode(repr(value)))) elif where == "builtin": dump.append("builtin %s = %s" % (name, repr(value))) htdump.append("builtin%s = %s" % (_tb_encode(name), _tb_encode(repr(value)))) elif where == "local": dump.append("%s = %s" % (name, repr(value))) htdump.append("%s = %s" % (_tb_encode(name), _tb_encode(repr(value)))) else: dump.append("%s%s = %s" % (where, name.split(".")[-1], repr(value))) htdump.append("%s%s = %s" % (_tb_encode(where), _tb_encode(name.split(".")[-1]), _tb_encode(repr(value)))) else: dump.append("%s undefined" % (name,)) htdump.append("%s undefined" % (_tb_encode(name,))) if html: req.write("\n" % (", ".join(htdump),)) req.error(", ".join(dump) + "\n") if html: req.write("
%s in %s%s
%s
" "%s %s
%s
\n") if html: req.write("\n") linecache.clearcache() class Request(object): """All the information about a CGI-style request, including how to respond.""" """Headers are buffered in a list before being sent. They are either sent on request, or when the first part of the body is sent. If requested, the body output can be buffered as well.""" def __init__(self, handler_type): """Create a Request object which uses handler_type as its handler.""" """An object of type handler_type, which should be a subclass of Handler, will be used to handle requests.""" self._handler_type = handler_type def _init(self): self._doneHeaders = 0 self._headers = [] self._bufferOutput = 1 self._output = StringIO.StringIO() self._pos = 0 self.closed = 0 try: del self.params except AttributeError: pass self.cookies = Cookie.SimpleCookie() if self.environ.has_key("HTTP_COOKIE"): self.cookies.load(self.environ["HTTP_COOKIE"]) self.aborted = 0 self.set_header("Content-Type", "text/html; charset=iso-8859-1") def __getattr__(self, name): if name == "params": self.params = {} self._read_cgi_data(self.environ, self.stdin) return self.__dict__["params"] raise AttributeError, "%s instance has no attribute %s" % \ (self.__class__.__name__, `name`) def close(self): """Closes the output stream.""" if not self.closed: self.flush() self._close() self.closed = 1 def _check_open(self): if self.closed: raise ValueError, "I/O operation on closed file" def output_headers(self): """Output the list of headers.""" self._check_open() if self._doneHeaders: raise SequencingError, "output_headers() called twice" for pair in self._headers: self._write("%s: %s\r\n" % pair) self._write("\r\n") self._doneHeaders = 1 def clear_headers(self): """Clear the list of headers.""" self._check_open() if self._doneHeaders: raise SequencingError, "cannot clear_headers() after output_headers()" self._headers = [] def add_header(self, hdr, val): """Add a header to the list of headers.""" self._check_open() if self._doneHeaders: raise SequencingError, \ "cannot add_header(%s) after output_headers()" % `hdr` self._headers.append((hdr, val)) def set_header(self, hdr, val): """Add a header to the list of headers, replacing any existing values.""" self._check_open() if self._doneHeaders: raise SequencingError, \ "cannot set_header(%s) after output_headers()" % `hdr` self.del_header(hdr) self._headers.append((hdr, val)) def get_header(self, hdr, index=0): """Retrieve a header from the list of headers.""" i = 0 hdr = hdr.lower() for pair in self._headers: if pair[0].lower() == hdr: if i == index: return pair[1] i += 1 return None def del_header(self, hdr): """Removes all values for a header from the list of headers.""" self._check_open() if self._doneHeaders: raise SequencingError, \ "cannot del_header(%s) after output_headers()" % `hdr` hdr = hdr.lower() while 1: for s in self._headers: if s[0].lower() == hdr: self._headers.remove(s) break else: break def set_buffering(self, f): """Specifies whether or not body output is buffered.""" self._check_open() if self._output.tell() > 0 and not f: self.flush() self._bufferOutput = f def flush(self): """Flushes the body output.""" self._check_open() if not self._doneHeaders: self.output_headers() self._write(self._output.getvalue()) self._pos += self._output.tell() self._output.seek(0, 0) self._output.truncate() self._flush() def clear_output(self): """Discards the contents of the body output buffer.""" self._check_open() if not self._bufferOutput: raise SequencingError, "cannot clear output when not buffering" self._output.seek(0, 0) self._output.truncate() def error(self, s): """Records an error message from the program.""" """The output is logged or otherwise stored on the server. It does not go to the client. Must be overridden by the sub-class.""" raise NotImplementedError, "error must be overridden" def _write(self, s): """Sends some data to the client.""" """Must be overridden by the sub-class.""" raise NotImplementedError, "_write must be overridden" def _flush(self): """Flushes data to the client.""" """May be overridden by the sub-class.""" pass def _close(self): """Closes the output stream.""" """May be overridden by the sub-class.""" pass def write(self, s): """Sends some data to the client.""" self._check_open() s = str(s) if self._bufferOutput: self._output.write(s) else: if not self._doneHeaders: self.output_headers() self._pos += len(s) self._write(s) def tell(self): return self._pos + self._output.tell() def seek(self, offset, whence=0): self._check_open() currentpos = self._pos + self._output.tell() currentlen = self._pos + len(self._output.getvalue()) if whence == 0: newpos = offset elif whence == 1: newpos = currentpos + offset elif whence == 2: newpos = currentlen + offset else: raise ValueError, "Bad 'whence' argument to seek()" if newpos == currentpos: return elif newpos < self._pos: raise ValueError, "Cannot seek backwards into already-sent data" elif newpos <= currentlen: self._output.seek(newpos - self._pos) else: if self._bufferOutput: self._output.seek(newpos - self._pos) else: self._write("\0" * (newpos - self._pos)) def _mergevars(self, encoded): """Parse variable-value pairs from a URL-encoded string.""" """Extract the variable-value pairs from the URL-encoded input string and merge them into the output dictionary. Variable-value pairs are separated from each other by the '&' character. Missing values are allowed. If the variable name ends with a '*' character, then the value that is placed in the dictionary will be a list. This is useful for multiple-value fields.""" for pair in encoded.split("&"): if pair == "": continue nameval = pair.split("=", 1) name = url_decode(nameval[0]) if len(nameval) > 1: val = url_decode(nameval[1]) else: val = None if name.endswith("!") or name.endswith("!*"): continue if name.endswith("*"): if self.params.has_key(name): self.params[name].append(val) else: self.params[name] = [val] else: self.params[name] = val def _mergemime(self, contenttype, encoded): """Parses variable-value pairs from a MIME-encoded input stream.""" """Extract the variable-value pairs from the MIME-encoded input file and merge them into the output dictionary. If the variable name ends with a '*' character, then the value that is placed in the dictionary will be a list. This is useful for multiple-value fields. If the variable name ends with a '!' character (before the '*' if present) then the value will be a mime.Entity object.""" import mime headers = "Content-Type: %s\n" % contenttype for entity in mime.Entity(encoded.read(), mime=1, headers=headers).entities: if not entity.content_disposition: continue if entity.content_disposition[0] != 'form-data': continue name = entity.content_disposition[1].get("name") if name[-1:] == "*": if self.params.has_key(name): if name[-2:-1] == "!": self.params[name].append(entity) else: self.params[name].append(entity.body) else: if name[-2:-1] == "!": self.params[name] = [entity] else: self.params[name] = [entity.body] elif name[-1:] == "!": self.params[name] = entity else: self.params[name] = entity.body def _read_cgi_data(self, environ, inf): """Read input data from the client and set up the object attributes.""" if environ.has_key("QUERY_STRING"): self._mergevars(environ["QUERY_STRING"]) if environ.get("REQUEST_METHOD") == "POST": if environ.get("CONTENT_TYPE", "").startswith("multipart/form-data"): self._mergemime(environ["CONTENT_TYPE"], inf) else: self._mergevars(inf.read(int(environ.get("CONTENT_LENGTH", "-1")))) def traceback(self): traceback(self) try: self.clear_headers() self.clear_output() self.set_header("Content-Type", "text/html; charset=iso-8859-1") except SequencingError: pass self.write("""\ Error

Error

Sorry, an error occurred. Please try again later.

""") class GZipMixIn(object): def _init(self): self._gzip = None self._gzip_level = 6 super(GZipMixIn, self)._init() def _close(self): if self._gzip: import struct super(GZipMixIn, self)._write(self._gzip.flush(self._gzip_zlib.Z_FINISH)) super(GZipMixIn, self)._write( struct.pack("