#!/usr/bin/env python # -*- coding: iso-8859-1 -*- ## Copyright 1999-2007 by LivingLogic AG, Bayreuth/Germany. ## Copyright 1999-2007 by Walter Dörwald ## ## All Rights Reserved ## ## See __init__.py for the license """ ll.url contains an RFC2396 compliant implementation of &url;s and classes for accessing resource metadata as well as file like classes for reading and writing resource data. These three levels of functionality are implemented in three classes: URL URLs are the names of resources and can be used and modified, regardless of the fact whether these resources actually exits. URLs never hits the hard drive or the net. Connection Connection objects contain functionality that accesses and changes file metadata (like last modified date, permission bits, directory structure etc.). A connection object can be created by calling the connect method on a URL object. Resource Resources are file like objects that work with the actual bytes that make up the file data. This functionality lives in the Resource class and it's subclasses. Creating a resource is done by calling the open method on a connection or a URL. """ __version__ = tuple(map(int, "$Revision: 1.55.2.6 $"[11:-2].split("."))) # $Source: /data/cvsroot/LivingLogic/Python/core/src/ll/url.py,v $ import sys, os, urllib, urllib2, types, mimetypes, mimetools, cStringIO, warnings import datetime, cgi, fnmatch, cPickle, errno, threading try: from email import utils as emutils except ImportError: from email import Utils as emutils # don't fail when pwd or grp can't be imported, because if this doesn't work, # we're probably on Windows and os.chown won't work anyway try: import pwd, grp except ImportError: pass try: import py except ImportError: py = None try: import Image except ImportError: pass try: import astyle except ImportError: from ll import astyle from ll import misc os.stat_float_times(True) def mime2dt(s): return datetime.datetime(*emutils.parsedate(s)[:7]) weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] monthname = [None, "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] def httpdate(dt): """ Return a string suitable for a Last-Modified and Expires header. dt is a datetime.datetime object in UTC. """ return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (weekdayname[dt.weekday()], dt.day, monthname[dt.month], dt.year, dt.hour, dt.minute, dt.second) from _url import escape as _escape, unescape as _unescape, normalizepath as _normalizepath alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" alphanum = alpha + "0123456789" mark = "-_.!~*'()" additionalsafe = "[]" safe = alphanum + mark + additionalsafe pathsafe = safe + ":@&=+$," + "|" # add "|" for Windows paths querysafe = alphanum fragsafe = alphanum schemecharfirst = alpha schemechar = alphanum + "+-." def _urlencode(query_parts): if query_parts is not None: res = [] items = query_parts.items() # generate a canonical order for the names items.sort() for (name, values) in items: if not isinstance(values, (list, tuple)): values = (values,) else: # generate a canonical order for the values values.sort() for value in values: res.append("%s=%s" % (_escape(name, querysafe), _escape(value, querysafe))) return "&".join(res) else: return None contextstack = threading.local() class Context(object): """ Calling URL.open or URL.connect creates a connection object. To avoid constantly creating new connections you can pass a Context object to those methods. Connections will be stored in the Context object and will be reused by those methods. A Context object can also be used as a context manager (see PEP 346 for more info). This context object will be used for all open and connect calls inside the with block. (Note that after the end of the with block, all connections will be closed.) """ def __init__(self): self.schemes = {} def closeall(self): """ Close and drop all connections in this context. """ for scheme in self.schemes: schemereg[scheme].closeall(self) self.schemes = {} def __enter__(self): try: stack = getattr(contextstack, "ll.url.contexts") except AttributeError: stack = [] setattr(contextstack, "ll.url.contexts", stack) stack.append(self) def __exit__(self, type, value, traceback): stack = getattr(contextstack, "ll.url.contexts") stack.pop() self.closeall() defaultcontext = Context() def getcontext(context): if context is None: try: stack = getattr(contextstack, "ll.url.contexts") except AttributeError: return defaultcontext try: return stack[-1] except IndexError: return defaultcontext return context class Connection(object): """ A Connection object is used for accessing and modifying the metadata associated with a file. It it created by calling the connect method on a URL object. """ @misc.notimplemented def stat(self, url): """ Return the result of a stat() call on the file url. """ @misc.notimplemented def lstat(self, url): """ Return the result of a stat() call on the file url. Like stat, but does not follow symbolic links. """ @misc.notimplemented def chmod(self, url, mode): """ Set the access mode of the file url to mode. """ @misc.notimplemented def chown(self, url, owner=None, group=None): """ Change the owner and/or group of the file url. """ @misc.notimplemented def lchown(self, url, owner=None, group=None): """ Change the owner and/or group of the file url (ignoring symbolic links). """ @misc.notimplemented def uid(self, url): """ Return the user id of the owner of the file url. """ @misc.notimplemented def gid(self, url): """ Return the group id the file url belongs to. """ @misc.notimplemented def owner(self, url): """ Return the name of the owner of the file url. """ @misc.notimplemented def group(self, url): """ Return the name of the group the file url belongs to. """ def mimetype(self, url): """ Return the mimetype of the file url. """ name = self._url2filename(url) mimetype = mimetypes.guess_type(name)[0] return mimetype or "application/octet-stream" @misc.notimplemented def exists(self, url): """ Test whether the file url exists. """ @misc.notimplemented def isfile(self, url): """ Test whether the resource url is a file. """ @misc.notimplemented def isdir(self, url): """ Test whether the resource url is a directory. """ @misc.notimplemented def islink(self, url): """ Test whether the resource url is a link. """ @misc.notimplemented def ismount(self, url): """ Test whether the resource url is a mount point. """ @misc.notimplemented def access(self, url, mode): """ Test for access to the file/resource url. """ def size(self, url): """ Return the size of the file url. """ return self.stat(url).st_size def imagesize(self, url): """ Return the size of the image url (if the resource is an image file) as a (width, height) tuple. This requires PIL. """ stream = self.open(url, "rb") img = Image.open(stream) # Requires PIL imagesize = img.size stream.close() return imagesize def cdate(self, url): """ Return the metadate change date of the file/resource url as a datetime.datetime object in UTC. """ return datetime.datetime.utcfromtimestamp(self.stat(url).st_ctime) def adate(self, url): """ Return the last access date of the file/resource url as a datetime.datetime object in UTC. """ return datetime.datetime.utcfromtimestamp(self.stat(url).st_atime) def mdate(self, url): """ Return the last modification date of the file/resource url as a datetime.datetime object in UTC. """ return datetime.datetime.utcfromtimestamp(self.stat(url).st_mtime) def resheaders(self, url): """ Return the &mime; headers for the file/resource url. """ return mimetools.Message( cStringIO.StringIO( "Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n" % (self.mimetype(url), self.size(url), httpdate(self.mdate(url))) ) ) @misc.notimplemented def remove(self, url): """ Remove the file url. """ @misc.notimplemented def rmdir(self, url): """ Remove the directory url. """ @misc.notimplemented def rename(self, url, target): """ Renames url to target. This might not work if target has a different scheme than url (or is on a different server). """ @misc.notimplemented def link(self, url, target): """ Create a hard link from url to target. This will not work if target has a different scheme than url (or is on a different server). """ @misc.notimplemented def symlink(self, url, target): """ Create a symbolic link from url to target. This will not work if target has a different scheme than url (or is on a different server). """ @misc.notimplemented def chdir(self, url): """ Change the current directory to url. """ os.chdir(self.name) @misc.notimplemented def mkdir(self, url, mode=0777): """ Create the directory url. """ @misc.notimplemented def makedirs(self, url, mode=0777): """ Create the directory url and all intermediate ones. """ @misc.notimplemented def listdir(self, url, pattern=None): """ Return a list of items in the directory url. The elements of the list are URL objects relative to url. With the optional pattern argument, this only lists items whose names match the given pattern. """ @misc.notimplemented def files(self, url, pattern=None): """ Return a list of files in the directory url. The elements of the list are URL objects relative to url. With the optional pattern argument, this only lists items whose names match the given pattern. """ @misc.notimplemented def dirs(self, url, pattern=None): """ Return a list of directories in the directory url. The elements of the list are URL objects relative to url. With the optional pattern argument, this only lists items whose names match the given pattern. """ @misc.notimplemented def walk(self, url, pattern=None): """ Return a recursive iterator over files and subdirectories. The iterator yields URL objects naming each child &url; of the directory url and its descendants relative to url. This performs a depth-first traversal, returning each directory before all its children. With the optional pattern argument, only yield items whose names match the given pattern. """ @misc.notimplemented def walkfiles(self, url, pattern=None): """ Return a recursive iterator over files in the directory url. With the optional pattern argument, only yield files whose names match the given pattern. """ @misc.notimplemented def walkdirs(self, url, pattern=None): """ Return a recursive iterator over subdirectories in the directory url. With the optional pattern argument, only yield directories whose names match the given pattern. """ @misc.notimplemented def open(self, url, *args, **kwargs): """ Open url for reading or writing. open returns a Resource object. Which additional parameters are supported depends on the actual resource created. Some common parameters are: mode A string indicating how the file is to be opened (just like the mode argument for the builtin open (e.g. "rb" or "wb"). headers Additional headers to use for an &http; request. data Request body to use for an &http; POST request. remotepython Name of the Python interpreter to use on the remote side (used by ssh &url;s) identity filename to be used as the identity file (private key) for authentication (used by ssh &url;s) """ class LocalConnection(Connection): def _url2filename(self, url): return os.path.expanduser(url.local()) def stat(self, url): return os.stat(self._url2filename(url)) def lstat(self, url): return os.lstat(self._url2filename(url)) def chmod(self, url, mode): name = self._url2filename(url) os.chmod(name, mode) def _chown(self, func, url, owner, group): name = self._url2filename(url) if owner is not None or group is not None: if owner is None or group is None: stat = os.stat(name) if owner is None: owner = stat.st_uid elif isinstance(owner, basestring): owner = pwd.getpwnam(owner)[2] if group is None: group = stat.st_gid elif isinstance(group, basestring): group = grp.getgrnam(group)[2] func(name, owner, group) def chown(self, url, owner=None, group=None): self._chown(os.chown, url, owner, group) def lchown(self, url, owner=None, group=None): self._chown(os.lchown, url, owner, group) def chdir(self, url): os.chdir(self._url2filename(url)) def mkdir(self, url, mode=0777): os.mkdir(self._url2filename(url), mode) def makedirs(self, url, mode=0777): os.makedirs(self._url2filename(url), mode) def uid(self, url): return self.stat(url).st_uid def gid(self, url): return self.stat(url).st_gid def owner(self, url): return pwd.getpwuid(self.uid(url))[0] def group(self, url): return grp.getgrgid(self.gid(url))[0] def exists(self, url): return os.path.exists(self._url2filename(url)) def isfile(self, url): return os.path.isfile(self._url2filename(url)) def isdir(self, url): return os.path.isdir(self._url2filename(url)) def islink(self, url): return os.path.islink(self._url2filename(url)) def ismount(self, url): return os.path.ismount(self._url2filename(url)) def access(self, url, mode): return os.access(self._url2filename(url), mode) def remove(self, url): return os.remove(self._url2filename(url)) def rmdir(self, url): return os.rmdir(self._url2filename(url)) def rename(self, url, target): name = self._url2filename(url) if not isinstance(target, URL): target = URL(target) targetname = self._url2filename(target) os.rename(name, target) def link(self, url, target): name = self._url2filename(url) if not isinstance(target, URL): target = URL(target) target = self._url2filename(target) os.link(name, target) def symlink(self, url, target): name = self._url2filename(url) if not isinstance(target, URL): target = URL(target) target = self._url2filename(target) os.symlink(name, target) def listdir(self, url, pattern=None): name = self._url2filename(url) result = [] for childname in os.listdir(name): if pattern is None or fnmatch.fnmatch(childname, pattern): if os.path.isdir(os.path.join(name, childname)): result.append(Dir(childname, scheme=url.scheme)) else: result.append(File(childname, scheme=url.scheme)) return result def files(self, url, pattern=None): name = self._url2filename(url) result = [] for childname in os.listdir(name): if pattern is None or fnmatch.fnmatch(childname, pattern): if os.path.isfile(os.path.join(name, childname)): result.append(File(childname, scheme=url.scheme)) return result def dirs(self, url, pattern=None): name = self._url2filename(url) result = [] for childname in os.listdir(name): if pattern is None or fnmatch.fnmatch(childname, pattern): if os.path.isdir(os.path.join(name, childname)): result.append(Dir(childname, scheme=url.scheme)) return result def _walk(self, base, name, pattern, which): if name: fullname = os.path.join(base, name) else: fullname = base for childname in os.listdir(fullname): fullchildname = os.path.join(fullname, childname) relchildname = os.path.join(name, childname) isdir = os.path.isdir(fullchildname) if (pattern is None or fnmatch.fnmatch(childname, pattern)) and which[isdir]: url = urllib.pathname2url(relchildname) if isdir: url += "/" yield URL(url) if isdir: for subchild in self._walk(base, relchildname, pattern, which): yield subchild def walk(self, url, pattern=None): return self._walk(self._url2filename(url), "", pattern, (True, True)) def walkfiles(self, url, pattern=None): return self._walk(self._url2filename(url), "", pattern, (True, False)) def walkdirs(self, url, pattern=None): return self._walk(self._url2filename(url), "", pattern, (False, True)) def open(self, url, mode="rb"): return FileResource(url, mode) if py is not None: class SshConnection(Connection): remote_code = py.code.Source(""" import os, urllib, cPickle, fnmatch os.stat_float_times(True) files = {} iterators = {} def ownergroup(filename, owner=None, group=None): if owner is not None or group is not None: if owner is None or group is None: if isinstance(filename, basestring): stat = os.stat(filename) else: stat = os.fstat(files[filename].fileno()) if owner is None: owner = stat.st_uid elif isinstance(owner, basestring): import pwd owner = pwd.getpwnam(owner)[2] if group is None: group = stat.st_gid elif isinstance(group, basestring): import grp group = grp.getgrnam(group)[2] return (owner, group) def _walk(base, name, pattern, which): if name: fullname = os.path.join(base, name) else: fullname = base for childname in os.listdir(fullname): fullchildname = os.path.join(fullname, childname) relchildname = os.path.join(name, childname) isdir = os.path.isdir(fullchildname) if (pattern is None or fnmatch.fnmatch(childname, pattern)) and which[isdir]: url = urllib.pathname2url(relchildname) if isdir: url += "/" yield url if isdir: for subchild in _walk(base, relchildname, pattern, which): yield subchild def walk(filename, pattern=None): return _walk(filename, "", pattern, (True, True)) def walkfiles(filename, pattern=None): return _walk(filename, "", pattern, (True, False)) def walkdirs(filename, pattern=None): return _walk(filename, "", pattern, (False, True)) while True: (filename, cmdname, args, kwargs) = channel.receive() if isinstance(filename, basestring): filename = os.path.expanduser(urllib.url2pathname(filename)) data = None try: if cmdname == "open": try: stream = open(filename, *args, **kwargs) except IOError, exc: if "w" not in args[0] or exc[0] != 2: # didn't work for some other reason than a non existing directory raise (splitpath, splitname) = os.path.split(filename) if splitpath: os.makedirs(splitpath) stream = open(filename, *args, **kwargs) else: raise # we don't have a directory to make so pass the error on data = id(stream) files[data] = stream elif cmdname == "stat": if isinstance(filename, basestring): data = os.stat(filename) else: data = os.fstat(files[filename].fileno()) elif cmdname == "lstat": data = os.lstat(filename) elif cmdname == "close": try: stream = files[filename] except KeyError: pass else: stream.close() del files[filename] elif cmdname == "chmod": data = os.chmod(filename, *args, **kwargs) elif cmdname == "chown": (owner, group) = ownergroup(filename, *args, **kwargs) if owner is not None: data = os.chown(filename, owner, group) elif cmdname == "lchown": (owner, group) = ownergroup(filename, *args, **kwargs) if owner is not None: data = os.lchown(filename, owner, group) elif cmdname == "uid": stat = os.stat(filename) data = stat.st_uid elif cmdname == "gid": stat = os.stat(filename) data = stat.st_gid elif cmdname == "owner": import pwd stat = os.stat(filename) data = pwd.getpwuid(stat.st_uid)[0] elif cmdname == "group": import grp stat = os.stat(filename) data = grp.getgrgid(stat.st_gid)[0] elif cmdname == "exists": data = os.path.exists(filename) elif cmdname == "isfile": data = os.path.isfile(filename) elif cmdname == "isdir": data = os.path.isdir(filename) elif cmdname == "islink": data = os.path.islink(filename) elif cmdname == "ismount": data = os.path.ismount(filename) elif cmdname == "access": data = os.access(filename, *args, **kwargs) elif cmdname == "remove": data = os.remove(filename) elif cmdname == "rmdir": data = os.rmdir(filename) elif cmdname == "rename": data = os.rename(filename, os.path.expanduser(args[0])) elif cmdname == "link": data = os.link(filename, os.path.expanduser(args[0])) elif cmdname == "symlink": data = os.symlink(filename, os.path.expanduser(args[0])) elif cmdname == "chdir": data = os.chdir(filename) elif cmdname == "mkdir": data = os.mkdir(filename) elif cmdname == "makedirs": data = os.makedirs(filename) elif cmdname == "makefifo": data = os.makefifo(filename) elif cmdname == "listdir": data = [] for f in os.listdir(filename): if args[0] is None or fnmatch.fnmatch(f, args[0]): data.append((os.path.isdir(os.path.join(filename, f)), f)) elif cmdname == "files": data = [] for f in os.listdir(filename): if args[0] is None or fnmatch.fnmatch(f, args[0]): if os.path.isfile(os.path.join(filename, f)): data.append(f) elif cmdname == "dirs": data = [] for f in os.listdir(filename): if args[0] is None or fnmatch.fnmatch(f, args[0]): if os.path.isdir(os.path.join(filename, f)): data.append(f) elif cmdname == "walk": iterator = walk(filename, *args, **kwargs) data = id(iterator) iterators[data] = iterator elif cmdname == "walkfiles": iterator = walkfiles(filename, *args, **kwargs) data = id(iterator) iterators[data] = iterator elif cmdname == "walkdirs": iterator = walkdirs(filename, *args, **kwargs) data = id(iterator) iterators[data] = iterator elif cmdname == "iteratornext": try: data = iterators[filename].next() except StopIteration: del iterators[filename] raise else: data = getattr(files[filename], cmdname) data = data(*args, **kwargs) except Exception, exc: if exc.__class__.__module__ != "exceptions": raise channel.send((True, cPickle.dumps(exc))) else: channel.send((False, data)) """) def __init__(self, context, server, remotepython="python", identity=None): # We don't have to store the context (this avoids cycles) self.server = server gateway = py.execnet.SshGateway(server, remotepython=remotepython, identity=identity) self._channel = gateway.remote_exec(self.remote_code) def close(self): if not self._channel.isclosed(): self._channel.close() self._channel.gateway.exit() self._channel.gateway.join() def _url2filename(self, url): if url.scheme != "ssh": raise ValueError("URL %r is not an ssh URL" % url) filename = str(url.path) if filename.startswith("/~"): filename = filename[1:] return filename def _send(self, filename, cmd, *args, **kwargs): self._channel.send((filename, cmd, args, kwargs)) (isexc, data) = self._channel.receive() if isexc: raise cPickle.loads(data) else: return data def stat(self, url): filename = self._url2filename(url) data = self._send(filename, "stat") return os.stat_result(data) # channel returned a tuple => wrap it def lstat(self): filename = self._url2filename(url) data = self._send(filename, "lstat") return os.stat_result(data) # channel returned a tuple => wrap it def chmod(self, url, mode): return self._send(self._url2filename(url), "chmod", mode) def chown(self, url, owner=None, group=None): return self._send(self._url2filename(url), "chown", owner, group) def lchown(self, url, owner=None, group=None): return self._send(self._url2filename(url), "lchown", owner, group) def chdir(self, url): return self._send(self._url2filename(url), "chdir") def mkdir(self, url, mode=0777): return self._send(self._url2filename(url), "mkdir", mode) def makedirs(self, url, mode=0777): return self._send(self._url2filename(url), "makedirs", mode) def uid(self, url): return self._send(self._url2filename(url), "uid") def gid(self, url): return self._send(self._url2filename(url), "gid") def owner(self, url): return self._send(self._url2filename(url), "owner") def group(self, url): return self._send(self._url2filename(url), "group") def exists(self, url): return self._send(self._url2filename(url), "exists") def isfile(self, url): return self._send(self._url2filename(url), "isfile") def isdir(self, url): return self._send(self._url2filename(url), "isdir") def islink(self, url): return self._send(self._url2filename(url), "islink") def ismount(self, url): return self._send(self._url2filename(url), "ismount") def access(self, url, mode): return self._send(self._url2filename(url), "access", mode) def remove(self, url): return self._send(self._url2filename(url), "remove") def rmdir(self, url): return self._send(self._url2filename(url), "rmdir") def _cmdwithtarget(self, cmdname, url, target): filename = self._url2filename(url) if not isinstance(target, URL): target = URL(target) targetname = self._url2filename(target) if target.server != url.server: raise OSError(errno.EXDEV, os.strerror(errno.EXDEV)) return self._send(filename, cmdname, targetname) def rename(self, url, target): return self._cmdwithtarget("rename", url, target) def link(self, url, target): return self._cmdwithtarget("link", url, target) def symlink(self, url, target): return self._cmdwithtarget("symlink", url, target) def listdir(self, url, pattern=None): filename = self._url2filename(url) result = [] for (isdir, name) in self._send(filename, "listdir", pattern): name = urllib.pathname2url(name) if isdir: name += "/" result.append(URL(name)) return result def files(self, url, pattern=None): filename = self._url2filename(url) return [URL(urllib.pathname2url(name)) for name in self._send(filename, "files", pattern)] def dirs(self, url, pattern=None): filename = self._url2filename(url) return [URL(urllib.pathname2url(name)+"/") for name in self._send(filename, "dirs", pattern)] def walk(self, url, pattern=None): filename = self._url2filename(url) iterator = self._send(filename, "walk", pattern) while True: yield URL(self._send(iterator, "iteratornext")) def walkfiles(self, url, pattern=None): filename = self._url2filename(url) iterator = self._send(filename, "walkfiles", pattern) while True: yield URL(self._send(iterator, "iteratornext")) def walkdirs(self, url, pattern=None): filename = self._url2filename(url) iterator = self._send(filename, "walkdirs", pattern) while True: yield URL(self._send(iterator, "iteratornext")) def open(self, url, mode="rb"): return RemoteFileResource(self, url, mode) def __repr__(self): return "<%s.%s to %r at 0x%x>" % (self.__class__.__module__, self.__class__.__name__, self.server, id(self)) class URLConnection(Connection): def mimetype(self, url): return url.open().mimetype() def size(self, url): return url.open().size() def imagesize(self, url): return url.open().imagesize() def mdate(self, url): return url.open().mdate() def resheaders(self, url): return url.open().resheaders() def open(self, url, mode="rb", headers=None, data=None): if mode != "rb": raise NotImplementedError("mode %r not supported" % mode) return URLResource(url, headers=headers, data=data) def here(scheme="file"): """ Return the current directory as an URL. """ return Dir(os.getcwd(), scheme) def home(user="", scheme="file"): """ Return the home directory of the current user (or the user named user, if user is specified) as an URL. >>> url.home() URL('file:/home/walter/') >>> url.home("andreas") URL('file:/home/andreas/') """ return Dir("~%s" % user, scheme) def root(): """ Return a blank root URL, i.e. URL("root:"). """ return URL("root:") def File(name, scheme="file"): """ Turn a filename into an URL: >>> url.File("a#b") URL('file:a%23b') """ name = urllib.pathname2url(os.path.expanduser(name.encode("utf-8"))) if name.startswith("///"): name = name[2:] url = URL(name) url.scheme = scheme return url def Dir(name, scheme="file"): """ Turns a directory name into an URL, just like File, but ensures that the path is terminated with a /: >>> url.Dir("a#b") URL('file:a%23b/') """ name = urllib.pathname2url(os.path.expanduser(name.encode("utf-8"))) if not name.endswith("/"): name += "/" if name.startswith("///"): name = name[2:] url = URL(name) url.scheme = scheme return url def Ssh(user, host, path="~/"): """ Return a ssh URL for the user user on the host host with the path path. path (defaulting to the users home directory) must be a path in &url; notation (i.e. use / as directory separator): >>> url.Ssh("root", "www.example.com", "~joe/public_html/index.html") URL('ssh://root@www.example.com/~joe/public_html/index.html') If the path starts with ~/ it is relative to this users home directory, if it starts with ~/user it's relative to the home directory of the user user. In all other cases the path is considered to be absolute. """ url = URL() url.scheme = "ssh" url.userinfo = user url.host = host if path.startswith("~"): path = "/" + path url.path = path return url def first(urls): """ Return the first &url; from urls that exists as a real file or directory. None entries in urls will be skipped. """ for url in urls: if url is not None: if url.exists(): return url def firstdir(urls): """ Return the first &url; from urls that exists as a real directory. None entries in urls will be skipped. """ for url in urls: if url is not None: if url.isdir(): return url def firstfile(urls): """ Return the first &url; from urls that exists as a real file. None entries in urls will be skipped. """ for url in urls: if url is not None: if url.isfile(): return url class importcache(dict): def remove(self, mod): try: dict.__delitem__(self, mod.__file__) except KeyError: pass importcache = importcache() def _import(filename): (path, name) = os.path.split(filename) (name, ext) = os.path.splitext(name) if ext != ".py": raise ImportError("Can only import .py files, not %s" % ext) oldmod = sys.modules.get(name, None) # get any existing module out of the way sys.modules[name] = mod = types.ModuleType(name) # create module and make sure it can find itself in sys.module mod.__file__ = filename execfile(filename, mod.__dict__) mod = sys.modules.pop(name) # refetch the module if it has replaced itself with a custom object if oldmod is not None: # put old module back sys.modules[name] = oldmod return mod class Resource(object): """ A Resource is a base class that provides a file-like interface to local and remote files, &url;s and other resources.
Attributes Each resource object has the following attributes: url The &url; for which this resource has been opened (i.e. foo.open().url is foo if foo is a URL object); name A string version of url; closed A bool specifying whether the resource has been closed (i.e. whether the close method has been called).
Methods In addition to file methods (like read, readlines, write and close) a resource object might provide the following methods: finalurl Return the real &url; of the resource (this might be different from the url attribute in case of a redirect). size Return the size of the file/resource. mdate Return the last modification date of the file/resource as a datetime.datetime object in UTC. mimetype Return the mimetype of the file/resource. imagesize Return the size of the image (if the resource is an image file) as a (width, height) tuple. This requires PIL.
""" def finalurl(self): return self.url def imagesize(self): pos = self.tell() self.seek(0) img = Image.open(self) # Requires PIL imagesize = img.size self.seek(pos) return imagesize def encoding(self): return None def __repr__(self): if self.closed: state = "closed" else: state = "open" return "<%s %s.%s %r, mode %r at 0x%x>" % (state, self.__class__.__module__, self.__class__.__name__, self.name, self.mode, id(self)) class FileResource(Resource, file): """ A subclass of Resource that handles local files. """ def __init__(self, url, mode="rb"): url = URL(url) name = os.path.expanduser(url.local()) try: file.__init__(self, name, mode) except IOError, exc: if "w" not in mode or exc[0] != 2: # didn't work for some other reason than a non existing directory raise (splitpath, splitname) = os.path.split(name) if splitpath: os.makedirs(splitpath) file.__init__(self, name, mode) else: raise # we don't have a directory to make so pass the error on self.url = url def size(self): # Forward to the connection return LocalSchemeDefinition._connection.size(self.url) def mdate(self): # Forward to the connection return LocalSchemeDefinition._connection.mdate(self.url) def mimetype(self): # Forward to the connection return LocalSchemeDefinition._connection.mimetype(self.url) if py is not None: class RemoteFileResource(Resource): """ A subclass of Resource that handles remote files (those with the ssh scheme). """ def __init__(self, connection, url, mode="rb"): self.connection = connection self.url = URL(url) self.mode = mode self.closed = False filename = self.connection._url2filename(url) self.name = str(self.url) self.remoteid = self._send(filename, "open", mode) def _send(self, filename, cmd, *args, **kwargs): if self.closed: raise ValueError("I/O operation on closed file") return self.connection._send(filename, cmd, *args, **kwargs) def close(self): if not self.closed: self._send(self.remoteid, "close") self.connection = None # close the channel too as there are no longer any meaningful operations self.closed = True def read(self, size=-1): return self._send(self.remoteid, "read", size) def readline(self, size=-1): return self._send(self.remoteid, "readline", size) def readlines(self, size=-1): return self._send(self.remoteid, "readlines", size) def __iter__(self): return self def next(self): return self._send(self.remoteid, "next") def seek(self, offset, whence=0): return self._send(self.remoteid, "seek", offset, whence) def tell(self): return self._send(self.remoteid, "tell") def truncate(self, size=None): if size is None: return self._send(self.remoteid, "truncate") else: return self._send(self.remoteid, "truncate", size) def write(self, string): return self._send(self.remoteid, "write", string) def writelines(self, strings): return self._send(self.remoteid, "writelines", strings) def flush(self): return self._send(self.remoteid, "flush") def size(self): # Forward to the connection return self.connection.size(self.url) def mdate(self): # Forward to the connection return self.connection.mdate(self.url) def mimetype(self): # Forward to the connection return self.connection.mimetype(self.url) class URLResource(Resource): """ A subclass of Resource that handles &http;, &ftp; and other &url;s (i.e. those that are not handled by FileResource or RemoteFileResource. """ def __init__(self, url, mode="rb", headers=None, data=None): if "w" in mode: raise ValueError("writing mode %r not supported" % mode) self.url = URL(url) self.name = str(self.url) self.mode = mode self.reqheaders = headers self.reqdata = data self._finalurl = None self.closed = False self._stream = None if data is not None: data = urllib.urlencode(data) if headers is None: headers = {} req = urllib2.Request(url=self.name, data=data, headers=headers) self._stream = urllib2.urlopen(req) self._finalurl = URL(self._stream.url) # Remember the final URL in case of a redirect self._resheaders = self._stream.info() self._mimetype = None self._encoding = None contenttype = self._resheaders.getheader("Content-Type") if contenttype is not None: (mimetype, options) = cgi.parse_header(contenttype) self._mimetype = mimetype self._encoding = options.get("charset") cl = self._resheaders.get("Content-Length") if cl: cl = int(cl) self._size = cl lm = self._resheaders.get("Last-Modified") if lm is not None: lm = mime2dt(lm) self._mdate = lm self._buffer = cStringIO.StringIO() def __getattr__(self, name): function = getattr(self._stream, name) def call(*args, **kwargs): return function(*args, **kwargs) return call def close(self): if not self.closed: self._stream.close() self._stream = None self.closed = True def __iter__(self): return iter(self._stream) def finalurl(self): return self._finalurl def mimetype(self): return self._mimetype def resheaders(self): return self._resheaders def encoding(self): return self._encoding def mdate(self): return self._mdate def size(self): return self._size def read(self, size=-1): data = self._stream.read(size) self._buffer.write(data) return data def readline(self, size=-1): data = self._stream.readline(size) self._buffer.write(data) return data def resdata(self): data = self._stream.read() self._buffer.write(data) return self._buffer.getvalue() def imagesize(self): img = Image.open(cStringIO.StringIO(self.resdata())) # Requires PIL return img.size def __iter__(self): while True: data = self._stream.readline() if not data: break self._buffer.write(data) yield data class SchemeDefinition(object): """ A SchemeDefinition instance defines the properties of a particular &url; scheme. """ _connection = URLConnection() def __init__(self, scheme, usehierarchy, useserver, usefrag, islocal=False, isremote=False, defaultport=None): """ Create a new SchemeDefinition instance. Arguments are: scheme: The name of the scheme; usehierarchy: Specifies whether this scheme uses hierarchical &url;s or opaque &url;s (i.e. whether hier_part or opaque_part from the &bnf; in RFC2396 is used); useserver: Specifies whether this scheme uses an Internet-based server authority component or a registry of naming authorities (only for hierarchical &url;s); usefrag: Specifies whether this scheme uses fragments (according to the &bnf; in RFC2396 every scheme does, but it doesn't make sense for e.g. "javascript", "mailto" or "tel"); islocal: Specifies whether &url;s with this scheme refer to local files; isremote: Specifies whether &url;s with this scheme refer to remote files (there may be schemes which are neither local nor remote, e.g. "mailto"); defaultport: The default port for this scheme (only for schemes using server based authority). """ self.scheme = scheme self.usehierarchy = usehierarchy self.useserver = useserver self.usefrag = usefrag self.islocal = islocal self.isremote = isremote self.defaultport = defaultport def connect(self, url, context=None, **kwargs): """ Create a connection for the URL url (which must have as the scheme). """ return self._connect(url, context, **kwargs)[0] def _connect(self, url, context=None, **kwargs): # Returns a tuple (connect, kwargs) (some of the keyword argument might have been consumed by the connect call, # the rest can be passed on the whatever call will be made on the connection itself) # We can always use the same connection here, because the connection for # local files and real URLs doesn't use any resources. # This will be overwritten by SshSchemeDefinition return (self._connection, kwargs) def open(self, *args, **kwargs): return URLConnection(*args, **kwargs) def closeall(self, context): """ Close all connections active for this scheme in the context context. """ def __repr__(self): return "<%s instance scheme=%r usehierarchy=%r useserver=%r usefrag=%r at 0x%x>" % (self.__class__.__name__, self.scheme, self.usehierarchy, self.useserver, self.usefrag, id(self)) class LocalSchemeDefinition(SchemeDefinition): # Use a different connection then the base class (but still one single connection for all URLs) _connection = LocalConnection() def open(self, *args, **kwargs): return FileResource(*args, **kwargs) class SshSchemeDefinition(SchemeDefinition): def _connect(self, url, context=None, **kwargs): if "remotepython" in kwargs or "identity" in kwargs: kwargs = kwargs.copy() remotepython = kwargs.pop("remotepython", "python") identity = kwargs.pop("identity", None) else: remotepython = "python" identity = None context = getcontext(context) if context is defaultcontext: raise ValueError("ssh URLs need a custom context") # Use one SshConnection for each user/host/remotepython combination server = url.server try: connections = context.schemes["ssh"] except KeyError: connections = context.schemes["ssh"] = {} try: connection = connections[(server, remotepython)] except KeyError: connection = connections[(server, remotepython)] = SshConnection(context, server, remotepython, identity) return (connection, kwargs) def open(self, url, mode="rb", context=None, remotepython="python", identity=None): (connection, kwargs) = self._connect(url, context, remotepython=remotepython, identity=identity) return RemoteFileResource(connection, url, mode, **kwargs) def closeall(self, context): for connection in context.schemes["ssh"].itervalues(): connection.close() schemereg = { "http": SchemeDefinition("http", usehierarchy=True, useserver=True, usefrag=True, isremote=True, defaultport=80), "https": SchemeDefinition("https", usehierarchy=True, useserver=True, usefrag=True, isremote=True, defaultport=443), "ftp": SchemeDefinition("ftp", usehierarchy=True, useserver=True, usefrag=True, isremote=True, defaultport=21), "file": LocalSchemeDefinition("file", usehierarchy=True, useserver=False, usefrag=True, islocal=True), "root": SchemeDefinition("root", usehierarchy=True, useserver=False, usefrag=True, islocal=True), "javascript": SchemeDefinition("javascript", usehierarchy=False, useserver=False, usefrag=False), "mailto": SchemeDefinition("mailto", usehierarchy=False, useserver=False, usefrag=False), "tel": SchemeDefinition("tel", usehierarchy=False, useserver=False, usefrag=False), "fax": SchemeDefinition("fax", usehierarchy=False, useserver=False, usefrag=False), "ssh": SshSchemeDefinition("ssh", usehierarchy=True, useserver=True, usefrag=True, islocal=False, isremote=True), } defaultreg = LocalSchemeDefinition("", usehierarchy=True, useserver=True, islocal=True, usefrag=True) class Path(object): __slots__ = ("_path", "_segments") def __init__(self, path=None): self._path = "" self._segments = [] self.path = path @classmethod def _fixsegment(cls, segment): if isinstance(segment, basestring): if isinstance(segment, unicode): segment = _escape(segment) return tuple(_unescape(name) for name in segment.split(";", 1)) else: assert 1 <= len(segment) <= 2, "path segment %r must have length 1 or 2, not %d" % (segment, len(segment)) return tuple(map(unicode, segment)) def _prefix(cls, path): if path.startswith("/"): return "/" else: return "" def insert(self, index, *others): segments = self.segments segments[index:index] = map(self._fixsegment, others) self.segments = segments def startswith(self, prefix): """ Return whether starts with the path prefix. prefix will be converted to a Path if it isn't one. """ if not isinstance(prefix, Path): prefix = Path(prefix) segments = prefix.segments if self.isabs != prefix.isabs: return False if segments and segments[-1] == (u"",) and len(self.segments)>len(segments): return self.segments[:len(segments)-1] == segments[:-1] else: return self.segments[:len(segments)] == segments def endswith(self, suffix): """ Return whether ends with the path suffix. suffix will be converted to a Path if it isn't one. If suffix is absolute a normal comparison will be done. """ if not isinstance(suffix, Path): suffix = Path(suffix) if suffix.isabs: return self == suffix else: segments = suffix.segments return self.segments[-len(segments):] == segments def clone(self): return Path(self) def __repr__(self): return "Path(%r)" % self._path def __str__(self): return self.path def __eq__(self, other): if not isinstance(other, Path): other = Path(other) return self._path == other._path def __ne__(self, other): return not self == other def __hash__(self): return hash(self._path) def __len__(self): return len(self.segments) def __getitem__(self, index): return self.segments[index] def __setitem__(self, index, value): segments = self.segments segments[index] = self._fixsegment(value) self._path = self._prefix(self._path) + self._segments2path(segments) self._segments = segments def __delitem__(self, index): segments = self.segments del segments[index] self._path = self._segments2path(segments) self._segments = segments def __contains__(self, item): return self._fixsegment(item) in self.segments def __getslice__(self, index1, index2): """ Return of slice of the path. The resulting path will always be relative, i.e. the leading / will be dropped. """ return Path(self.segments[index1:index2]) def __setslice__(self, index1, index2, seq): segments = self.segments segments[index1:index2] = map(self._fixsegment, seq) self._path = self._prefix(self._path) + self._segments2path(segments) self._segments = segments def __delslice__(self, index1, index2): del self.segments[index1:index2] class isabs(misc.propclass): """ Is the path absolute? """ def __get__(self): return self._path.startswith("/") def __set__(self, isabs): isabs = bool(isabs) if isabs != self._path.startswith("/"): if isabs: self._path = "/" + self._path else: self._path = self._path[1:] def __delete__(self): if self._path.startswith("/"): self._path = self._path[1:] @classmethod def _segments2path(cls, segments): return "/".join(";".join(_escape(value, pathsafe) for value in segment) for segment in segments) @classmethod def _path2segments(cls, path): if path.startswith("/"): path = path[1:] return map(cls._fixsegment, path.split("/")) def _setpathorsegments(self, path): if path is None: self._path = "" self._segments = [] elif isinstance(path, Path): self._path = path._path self._segments = None elif isinstance(path, (list, tuple)): self._segments = map(self._fixsegment, path) self._path = self._prefix(self._path) + self._segments2path(self._segments) else: if isinstance(path, unicode): path = _escape(path) prefix = self._prefix(path) if prefix: path = path[1:] self._segments = self._path2segments(path) self._path = prefix + self._segments2path(self._segments) class path(misc.propclass): """ The complete path as a string. """ def __get__(self): return self._path def __set__(self, path): self._setpathorsegments(path) def __delete__(self): self.clear() class segments(misc.propclass): """ The path as a list of (name, param) tuples. """ def __get__(self): if self._segments is None: self._segments = self._path2segments(self._path) return self._segments def __set__(self, path): self._setpathorsegments(path) def __delete__(self): self._path = self._prefix(self._path) self._segments = [] class file(misc.propclass): """ The filename without the path, i.e. the name part of the last component of path. The baz.html part of http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): try: return self[-1][0] except IndexError: return None def __set__(self, file): """ Setting the filename preserves the parameter in the last segment. """ if file is None: del self.file segments = self.segments if segments: if len(segments[-1]) == 1: self[-1] = (file, ) else: self[-1] = (file, segments[-1][1]) else: self.segments = [(file,)] def __delete__(self): """ Deleting the filename preserves the parameter in the last segment. """ segments = self.segments if segments: if len(segments[-1]) == 1: self[-1] = ("", ) else: self[-1] = ("", segments[-1][1]) class ext(misc.propclass): """ The filename extension of the last segment of the path. The html part of http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): ext = None segments = self.segments if segments: name = segments[-1][0] pos = name.rfind(".") if pos != -1: ext = name[pos+1:] return ext def __set__(self, ext): """ Setting the extension preserves the parameter in the last segment. """ if ext is None: del self.ext segments = self.segments if segments: segment = segments[-1] name = segment[0] pos = name.rfind(".") if pos != -1: name = name[:pos+1] + ext else: name = name + "." + ext if len(segment)>1: self[-1] = (name, segment[1]) else: self[-1] = (name, ) def __delete__(self): """ Deleting the extension preserves the parameter in the last segment. """ segments = self.segments if segments: segment = segments[-1] name = segment[0] pos = name.rfind(".") if pos != -1: name = name[:pos] if len(segment)>1: self[-1] = (name, segment[1]) else: self[-1] = (name, ) def withext(self, ext): """ Return a new Path where the filename extension has been replaced with ext. """ path = self.clone() path.ext = ext return path def withoutext(self): """ Return a new Path where the filename extension has been removed. """ if "/" not in self._path and self._path.rfind(".")==0: return Path("./") else: path = self.clone() del path.ext return path def withfile(self, file): """ Return a new Path where the filename (i.e. the name of last component of segments) has been replaced with file. """ path = self.clone() path.file = file return path def withoutfile(self): """ Return a new Path where the filename (i.e. the name of last component of segments) has been removed. """ if "/" not in self._path: return Path("./") else: path = Path(self) del path.file return path def clear(self): self._path = "" self._segments = [] def __div__(self, other): """ join two paths. """ if isinstance(other, basestring): other = Path(other) if isinstance(other, Path): newpath = Path() # RFC2396, Section 5.2 (5) if other.isabs: newpath._path = other._path newpath._segments = None else: # the following should be equivalent to RFC2396, Section 5.2 (6) (c)-(f) newpath._path = self._prefix(self._path) + self._segments2path( _normalizepath( self.segments[:-1] + # RFC2396, Section 5.2 (6) (a) other.segments # RFC2396, Section 5.2 (6) (b) ) ) newpath._segments = None return newpath elif isinstance(other, (list, tuple)): # this makes path/list possible return other.__class__(self/path for path in other) else: # this makes path/generator possible return (self/path for path in other) def __rdiv__(self, other): """ Right hand version of __div__. This supports list and generators as the left hand side too. """ if isinstance(other, basestring): other = Path(other) if isinstance(other, Path): return other/self elif isinstance(other, (list, tuple)): return other.__class__(path/self for path in other) else: return (path/self for path in other) def relative(self, basepath): """ Return an relative Path rel such that basepath/rel == , i.e. this is the inverse operation of __div__. If is relative, an identical copy of will be returned. """ # if self is relative don't do anything if not self.isabs: pass # FIXME return self.clone() basepath = Path(basepath) # clone/coerce self_segments = _normalizepath(self.segments) base_segments = _normalizepath(basepath.segments) while len(self_segments)>1 and len(base_segments)>1 and self_segments[0]==base_segments[0]: del self_segments[0] del base_segments[0] # build a path from one file to the other self_segments[:0] = [(u"..",)]*(len(base_segments)-1) if not len(self_segments) or self_segments==[(u"",)]: self_segments = [(u".",), (u"",)] return Path(self._segments2path(self_segments)) def reverse(self): segments = self.segments segments.reverse() if segments and segments[0] == (u"",): del segments[0] segments.append((u"",)) self.segments = segments def normalize(self): self.segments = _normalizepath(self.segments) def normalized(self): new = self.clone() new.normalize() return new def local(self): """ Return converted to a filename using the file naming conventions of the OS. Parameters will be dropped in the resulting string. """ path = Path(self._prefix(self._path) + "/".join(segment[0] for segment in self)) path = path._path localpath = urllib.url2pathname(path) if path.endswith("/") and not (localpath.endswith(os.sep) or (os.altsep is not None and localpath.endswith(os.altsep))): localpath += os.sep return localpath def abs(self): """ Return an absolute version of . """ path = os.path.abspath(self.local()) path = path.rstrip(os.sep) if path.startswith("///"): path = path[2:] path = urllib.pathname2url(path.encode("utf-8")) if len(self) and self.segments[-1] == ("",): path += "/" return Path(path) def real(self): """ Return the canonical version of , eliminating all symbolic links. """ path = os.path.realpath(self.local()) path = path.rstrip(os.sep) path = urllib.pathname2url(path.encode("utf-8")) if path.startswith("///"): path = path[2:] if len(self) and self.segments[-1] == ("",): path += "/" return Path(path) class Query(dict): __slots__= () def __init__(self, arg=None, **kwargs): if arg is not None: if isinstance(arg, dict): for (key, value) in arg.iteritems(): self.add(key, value) else: for (key, value) in arg: self.add(key, value) for (key, value) in kwargs.iteritems(): self.add(key, value) def __setitem__(self, key, value): dict.__setitem__(self, unicode(key), [unicode(value)]) def add(self, key, *values): key = unicode(key) values = map(unicode, values) self.setdefault(key, []).extend(values) def __xrepr__(self, mode="default"): if mode == "cell": yield (astyle.style_url, str(self)) else: yield (astyle.style_url, repr(self)) class URL(object): """ An RFC2396 compliant &url;. """ def __init__(self, url=None): """ Create a new URL instance. url may be a str or unicode instance, or an URL (in which case you'll get of copy of url), or None (which will create an URL referring to the current document). """ self.url = url def _clear(self): # internal helper method that makes the ``self`` empty. self.reg = defaultreg self._scheme = None self._userinfo = None self._host = None self._port = None self._path = Path() self._reg_name = None self._query = None self._query_parts = None self._opaque_part = None self._frag = None def clone(self): """ Return an identical copy . """ return URL(self) @staticmethod def _checkscheme(scheme): # Check whether ``scheme`` contains only legal characters. if scheme[0] not in schemecharfirst: return False for c in scheme[1:]: if c not in schemechar: return False return True class scheme(misc.propclass): """ the &url; scheme (e.g. ftp, ssh, http or mailto). The scheme will be None if the &url; is a relative one. """ def __get__(self): return self._scheme def __set__(self, scheme): """ The scheme will be converted to lowercase on setting (if scheme is not None, otherwise the scheme will be deleted). """ if scheme is None: self._scheme = None else: scheme = scheme.lower() # check if the scheme only has allowed characters if not self._checkscheme(scheme): raise ValueError("Illegal scheme char in scheme %r" % (scheme, )) self._scheme = scheme self.reg = schemereg.get(scheme, defaultreg) def __delete__(self): """ Deletes the scheme, i.e. makes the &url; relative. """ self._scheme = None self.reg = defaultreg class userinfo(misc.propclass): """ the user info part of the URL; i.e. the user part of http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): return self._userinfo def __set__(self, userinfo): self._userinfo = userinfo def __delete__(self): self._userinfo = None class host(misc.propclass): """ the host part of the URL; i.e. the www.example.com part of http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): return self._host def __set__(self, host): if host is not None: host = host.lower() self._host = host def __delete__(self): self._host = None class port(misc.propclass): """ the port number of the URL (as an int) or None if the URL has none. The 8080 in http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): return self._port def __set__(self, port): if port is not None: port = int(port) self._port = port def __delete__(self): self._port = None class hostport(misc.propclass): """ the host and (if specified) the port number of the URL, i.e. the www.example.com:8080 in http://user@www.example.com:8080/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): if self.host is not None: hostport = _escape(self.host, safe) if self.port is not None: hostport += ":%d" % self.port return hostport else: return None def __set__(self, hostport): # find the port number (RFC2396, Section 3.2.2) if hostport is None: del self.hostport else: del self.port pos = hostport.rfind(":") if pos != -1: if pos != len(hostport)-1: self.port = hostport[pos+1:] hostport = hostport[:pos] self.host = _unescape(hostport) def __delete__(self): del self.host del self.port class server(misc.propclass): """ the server part of the URL; i.e. the user@www.example.com part of http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): if self.hostport is not None: userinfo = self.userinfo if userinfo is not None: return _escape(userinfo, safe) + "@" + self.hostport else: return self.hostport else: return None def __set__(self, server): """ Setting the server always works even if the current scheme does use opaque_part or reg_name, but will be ignored for url. """ if server is None: del self.server else: # find the userinfo (RFC2396, Section 3.2.2) pos = server.find("@") if pos != -1: self.userinfo = _unescape(server[:pos]) server = server[pos+1:] else: del self.userinfo self.hostport = server def __delete__(self): del self.userinfo del self.hostport class reg_name(misc.propclass): """ The reg_name part of the URL for hierarchical schemes that use a name based authority instead of server. """ def __get__(self): return self._reg_name def __set__(self, reg_name): if reg_name is None: del self.reg_name else: self._reg_name = reg_name def __delete__(self): self._reg_name = None class authority(misc.propclass): """ The authority part of the URL for hierarchical schemes. Depending on the scheme, this is either server or reg_name. """ def __get__(self): if self.reg.useserver: return self.server else: return self.reg_name def __set__(self, authority): if self.reg.useserver: self.server = authority else: self.reg_name = authority def __delete__(self): if self.reg.useserver: del self.server else: del self.reg_name class isabspath(misc.propclass): """ Specifies whether the path of a hierarchical URL is absolute, (i.e. it has a leading "/"). Note that the path will always be absolute if an authority is specified. """ def __get__(self): return (self.authority is not None) or self.path.isabs def __set__(self, isabspath): self.path.isabs = isabspath class path(misc.propclass): """ The path segments of a hierarchical URL as a Path object. """ def __get__(self): return self._path def __set__(self, path): self._path = Path(path) def __delete__(self): self._path = Path() class file(misc.propclass): """ The filename without the path, i.e. the name part of the last component of path. The baz.html part of http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): return self.path.file def __set__(self, file): """ Setting the filename preserves the parameter in the last segment. """ self.path.file = file def __delete__(self): """ Deleting the filename preserves the parameter in the last segment. """ del self.path.file class ext(misc.propclass): """ The filename extension of the last segment of the path. The html part of http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): return self.path.ext def __set__(self, ext): """ Setting the extension preserves the parameter in the last segment. """ self.path.ext = ext def __delete__(self): """ Deleting the extension preserves the parameter in the last segment. """ del self.path.ext class query_parts(misc.propclass): """ The query component as a dictionary, i.e. {u"spam": u"eggs"} from http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. If the query component couldn't be parsed, query_parts will be False. """ def __get__(self): return self._query_parts def __set__(self, query_parts): self._query = _urlencode(query_parts) self._query_parts = query_parts def __delete__(self): self._query = None self._query_parts = None class query(misc.propclass): """ The query component, i.e. the spam=eggs part of http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): if self._query_parts is False: return self._query else: return _urlencode(self._query_parts) def __set__(self, query): self._query = query if query is not None: parts = {} for part in query.split(u"&"): namevalue = part.split(u"=", 1) name = _unescape(namevalue[0].replace("+", " ")) if len(namevalue) == 2: value = _unescape(namevalue[1].replace("+", " ")) parts.setdefault(name, []).append(value) else: parts = False break query = parts self._query_parts = query def __delete__(self): self._query = None self._query_parts = None class opaque_part(misc.propclass): """ The opaque part (for schemes like mailto that are not hierarchical). """ def __get__(self): return self._opaque_part def __set__(self, opaque_part): self._opaque_part = opaque_part def __delete__(self): self._opaque_part = None class frag(misc.propclass): """ The fragment identifier, which references a part of the resource, i.e. the frag part of http://user@www.example.com/bar/baz.html;xyzzy?spam=eggs#frag. """ def __get__(self): return self._frag def __set__(self, frag): self._frag = frag def __delete__(self): self._frag = None class url(misc.propclass): """ The complete &url; """ def __get__(self): """ Getting url reassembles the &url; from the components. """ result = "" if self.scheme is not None: result += self.scheme + ":" if self.reg.usehierarchy: if self.authority is not None: result += "//" + self.authority if not self.path.isabs: result += "/" result += str(self.path) if self.query is not None: result += "?" + self.query else: result += self.opaque_part if self.reg.usefrag and self.frag is not None: result += "#" + _escape(self.frag, fragsafe) return result def __set__(self, url): """ Setting url parses url into the components. url may also be an URL instance, in which case the &url; will be copied. """ self._clear() if url is None: return elif isinstance(url, URL): self.scheme = url.scheme self.userinfo = url.userinfo self.host = url.host self.port = url.port self.path = url.path.clone() self.reg_name = url.reg_name self.opaque_part = url.opaque_part self.query = url.query self.frag = url.frag else: if isinstance(url, unicode): url = _escape(url) # find the scheme (RFC2396, Section 3.1) pos = url.find(":") if pos != -1: scheme = url[:pos] if self._checkscheme(scheme): # if the scheme is illegal assume there is none (e.g. "/foo.php?x=http://www.bar.com", will *not* have the scheme "/foo.php?x=http") self.scheme = scheme # the info about what we have to expect in the rest of the URL can be found in self.reg now url = url[pos+1:] # find the fragment (RFC2396, Section 4.1) if self.reg.usefrag: # the fragment itself may not contain a "#", so find the last "#" pos = url.rfind("#") if pos != -1: self.frag = _unescape(url[pos+1:]) url = url[:pos] if self.reg.usehierarchy: # find the query (RFC2396, Section 3.4) pos = url.rfind("?") if pos != -1: self.query = url[pos+1:] url = url[:pos] if url.startswith("//"): url = url[2:] # find the authority part (RFC2396, Section 3.2) pos = url.find("/") if pos!=-1: authority = url[:pos] url = url[pos:] # keep the "/" else: authority = url url = "/" self.authority = authority self.path = Path(url) else: self.opaque_part = url def __delete__(self): """ After deleting the &url; the resulting object will refer to the current document. """ self._clear() def withext(self, ext): """ Return a new URL where the filename extension has been replaced with ext. """ url = URL(self) url.path = url.path.withext(ext) return url def withoutext(self): """ Return a new URL where the filename extension has been removed. """ url = URL(self) url.path = url.path.withoutext() return url def withfile(self, file): """ Return a new URL where the filename (i.e. the name of last component of path_segments) has been replaced with filename. """ url = URL(self) url.path = url.path.withfile(file) return url def withoutfile(self): url = URL(self) url.path = url.path.withoutfile() return url def withfrag(self, frag): """ Return a new URL where the fragment has been replaced with frag. """ url = URL(self) url.frag = frag return url def withoutfrag(self): """ Return a new URL where the frag has been dropped. """ url = URL(self) del url.frag return url def __div__(self, other): """ join with another (possible relative) URL other, to form a new URL. other may be a str, unicode or URL instance. It may be None (referring to the current document) in which case will be returned. It may also be a list or other iterable. For this case a list (or iterator) will be returned where __div__ will be applied to every item in the list/iterator. E.g. the following expression returns all the files in the current directory as absolute URLs (see files and here for further explanations): >>> here = url.here() >>> for f in here/here.files(): ... print f """ if isinstance(other, basestring): other = URL(other) if isinstance(other, URL): newurl = URL() # RFC2396, Section 5.2 (2) if other.scheme is None and other.authority is None and str(other.path)=="" and other.query is None: newurl = URL(self) newurl.frag = other.frag return newurl if not self.reg.usehierarchy: # e.g. "mailto:x@y"/"file:foo" return other # In violation of RFC2396 we treat file URLs as relative ones (if the base is a local URL) if other.scheme=="file" and self.islocal(): del other.scheme del other.authority # RFC2396, Section 5.2 (3) if other.scheme is not None: return other newurl.scheme = self.scheme newurl.query = other.query newurl.frag = other.frag # RFC2396, Section 5.2 (4) if other.authority is None: newurl.authority = self.authority # RFC2396, Section 5.2 (5) & (6) (a) (b) newurl._path = self._path/other._path else: newurl.authority = other.authority newurl._path = other._path.clone() return newurl elif isinstance(other, (list, tuple)): # this makes path/list possible return other.__class__(self/path for path in other) else: # this makes path/generator possible return (self/path for path in other) def __rdiv__(self, other): """ Right hand version of __div__. This support lists and iterables as the left hand side too. """ if isinstance(other, basestring): other = URL(other) if isinstance(other, URL): return other/self elif isinstance(other, (list, tuple)): return other.__class__(item/self for item in other) else: return (item/self for item in other) def relative(self, baseurl): """ Return an relative URL rel such that baseurl/rel == , i.e. this is the inverse operation of __div__. If is relative, has a different scheme or authority than baseurl or a non-hierarchical scheme, an identical copy of will be returned. """ # if self is relative don't do anything if self.scheme is None: return URL(self) # javascript etc. if not self.reg.usehierarchy: return URL(self) baseurl = URL(baseurl) # clone/coerce # only calculate a new URL if to the same server, else use the original if self.scheme != baseurl.scheme or self.authority != baseurl.authority: return URL(self) newurl = URL(self) # clone del newurl.scheme del newurl.authority selfpath_segments = _normalizepath(self._path.segments) basepath_segments = _normalizepath(baseurl._path.segments) while len(selfpath_segments)>1 and len(basepath_segments)>1 and selfpath_segments[0]==basepath_segments[0]: del selfpath_segments[0] del basepath_segments[0] # does the URL go to the same file? if selfpath_segments==basepath_segments and self.query==baseurl.query: # only return the frag del newurl.path del newurl.query else: # build a path from one file to the other selfpath_segments[:0] = [(u"..",)]*(len(basepath_segments)-1) if not len(selfpath_segments) or selfpath_segments==[(u"",)]: selfpath_segments = [(u".",), (u"",)] newurl._path.segments = selfpath_segments newurl._path = self.path.relative(baseurl.path) newurl._path.isabs = False return newurl def __str__(self): return self.url def __unicode__(self): return self.url def __repr__(self): return "URL(%r)" % self.url def __nonzero__(self): """ Return whether the URL is not empty, i.e. whether it is not the URL referring to the start of the current document. """ return self.url != "" def __eq__(self, other): """ Return whether two URL instances are equal. Note that only properties relevant for the current scheme will be compared. """ if self.__class__!=other.__class__: return False if self.scheme!=other.scheme: return False if self.reg.usehierarchy: if self.reg.useserver: selfport = self.port or self.reg.defaultport otherport = other.port or other.reg.defaultport if self.userinfo!=other.userinfo or self.host!=other.host or selfport!=otherport: return False else: if self.reg_name!=other.reg_name: return False if self._path!=other._path: return False else: if self.opaque_part!=other.opaque_part: return False # Use canonical version of (i.e. sorted names and values) if self.query != other.query: return False if self.frag != other.frag: return False return True def __ne__(self, other): """ Return whether two URLs are not equal. """ return not self==other def __hash__(self): """ Return a hash value for , to be able to use URLs as dictionary keys. You must be careful not to modify an URL as soon as you use it as a dictionary key. """ res = hash(self.scheme) if self.reg.usehierarchy: if self.reg.useserver: res ^= hash(self.userinfo) res ^= hash(self.host) res ^= hash(self.port or self.reg.defaultport) else: res ^= hash(self.reg_name) res ^= hash(self._path) else: res ^= hash(self.opaque_part) res ^= hash(self.query) res ^= hash(self.frag) return res def abs(self, scheme=-1): """ Return an absolute version of (works only for local &url;s). If the argument scheme is specified, it will be used for the resulting &url; otherwise the result will have the same scheme as . """ self._checklocal() new = self.clone() new.path = self.path.abs() if scheme != -1: new.scheme = scheme return new def real(self, scheme=-1): """ Return the canonical version of , eliminating of symbolic links (works only for local &url;s). If the argument scheme is specified, it will be used for the resulting &url; otherwise the result will have the same scheme as . """ self._checklocal() new = self.clone() new.path = self.path.real() if scheme != -1: new.scheme = scheme return new def islocal(self): """ return whether refers to a local file, i.e. whether is a relative URL or the scheme is root or file). """ return self.reg.islocal def _checklocal(self): if not self.islocal(): raise ValueError("URL %r is not local" % self) def local(self): """ Return as a local filename (which will only works if is local (see islocal)). """ self._checklocal() return self.path.local() def _connect(self, context=None, **kwargs): return self.reg._connect(self, context=context, **kwargs) def connect(self, context=None, **kwargs): """ Return a Connection object for accessing and modifying s metadata. Whether you get a new connection object, or an existing one depends on the scheme, the &url; itself, and the context passed in (as the context argument). """ return self._connect(context, **kwargs)[0] def open(self, mode="rb", context=None, *args, **kwargs): """ Open for reading or writing. open returns a Resource object. Which additional parameters are supported depends on the actual resource created. Some common parameters are: mode (supported by all resources) A string indicating how the file is to be opened (just like the mode argument for the builtin open; e.g. "rb" or "wb"). context (supported by all resources) open needs a connection for this &url; which it gets from a context object. headers Additional headers to use for an &http; request. data Request body to use for an &http; POST request. remotepython Name of the Python interpreter to use on the remote side (used by ssh &url;s) identity filename to be used as the identity file (private key) for authentication (used by ssh &url;s) """ (connection, kwargs) = self._connect(context=context, **kwargs) return connection.open(self, mode, *args, **kwargs) def openread(self, context=None, *args, **kwargs): return self.open("rb", context, *args, **kwargs) def openwrite(self, context=None, *args, **kwargs): return self.open("wb", context, *args, **kwargs) def __getattr__(self, name): """ __getattr__ forwards every unresolved attribute access to the appropriate connection. This makes it possible to call Connection methods directly on URL objects: >>> from ll import url >>> u = url.URL("file:README") >>> u.size() 1584L instead of: >>> from ll import url >>> u = url.URL("file:README") >>> u.connect().size(u) 1584L """ def realattr(*args, **kwargs): try: context = kwargs["context"] except KeyError: context = None else: kwargs = kwargs.copy() del kwargs["context"] (connection, kwargs) = self._connect(context=context, **kwargs) return getattr(connection, name)(self, *args, **kwargs) return realattr def import_(self, mode="always"): """ import the file as a Python module. The file extension will be ignored, which means that you might not get exactly the file you specified. mode can have the following values: "always" (the default): The module will be imported on every call; "once": The module will be imported only on the first call; "new": The module will be imported every time it has changed since the last call. """ filename = self.real().local() if mode=="always": mdate = self.mdate() elif mode=="once": try: return importcache[filename][1] except KeyError: mdate = self.mdate() elif mode=="new": mdate = self.mdate() try: (oldmdate, module) = importcache[filename] except KeyError: pass else: if mdate<=oldmdate: return module else: raise ValueError, "mode %r unknown" % mode module = _import(filename) importcache[filename] = (mdate, module) return module def __iter__(self): try: isdir = self.isdir() except AttributeError: isdir = False if isdir: return iter(self/self.listdir()) else: return iter(self.open()) def __xrepr__(self, mode="default"): if mode == "cell": yield (astyle.style_url, str(self)) else: yield (astyle.style_url, repr(self)) warnings.filterwarnings("always", module="url")