""" URLFetch.py Module for retrieving data from a URL. """ __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc" __license__ = """ Straw is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Straw is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ import httplib_async import asyncore import urlparse import base64 import gzip try: from cStringIO import StringIO except: from StringIO import StringIO import error import time import NetworkConstants import Config import LookupManager import constants def is_ip(host): parts = host.split(".") if len(parts) != 4: return 0 for p in parts: try: np = int(p) except (ValueError, TypeError): return 0 if not 0 <= np < 256: return 0 return 1 class RequestSchemeException(Exception): def __init__(self, scheme): Exception.__init__(self) self.scheme = scheme self.args = {} class Request: def __init__(self, host = None, port = None, path = None, ip = None, headers = None, user = None, password = None, priority = None, consumer = None, uri = None): self.host = host self.port = port self.path = path self.ip = ip self.headers = headers self.user = user self.password = password self.priority = priority self.consumer = consumer self.uri = uri class Caller: def __init__(self, f): self._f = f def __call__(self): if not hasattr(self, "_f"): return try: self._f() finally: del self._f class ConnectionManager: def __init__(self): self._queue = [] def request(self, uri, consumer, headers={}, user=None, password=None, priority=NetworkConstants.PRIORITY_DEFAULT): uri = uri.strip() scheme, host, path, params, query, fragment = urlparse.urlparse(uri) if scheme != "http": raise RequestSchemeException(scheme) try: host, port = host.split(":", 1) port = int(port) except (TypeError, ValueError): port = 80 # default port if not path: path = "/" if params: path = path + ";" + params if query: path = path + "?" + query req = Request(host = host, port = port, path = path, headers = headers, user = user, password = password, priority = priority, consumer = consumer, uri = uri) pc = Config.get_instance().proxy_config if pc.use: # we don't set ConsumerAdapter's proxy here because we might not # yet know its ip. However, we can set the Proxy-Authorization # line if pc.use_authentication: req.headers['Proxy-Authorization'] = ('Basic %s' % base64.encodestring('%s:%s' % (pc.user, pc.password)).strip()) adapter = ConsumerAdapter(req) # no lookup necessary if host is an ip address. # if we are using a proxy, let it handle the lookup. if is_ip(host): adapter.request.ip = host self._queue_request(adapter) elif pc.use: self._queue_request(adapter) else: try: LookupManager.get_instance().lookup( host, self._request_resolved, adapter) except LookupManager.NameFormatException, e: adapter.http_failed(e) # don't return the categoryadapter object, because nothing in # it except stop() should be called from other parts of Straw return Caller(adapter.stop) def _queue_request(self, adapter): i = 0 while i < len(self._queue): if self._queue[i].request.priority > adapter.request.priority: self._queue.insert(i, adapter) return i += 1 # from outside while self._queue.append(adapter) def _request_resolved(self, host, ip, adapter): if ip is not None and ip != "": adapter.request.ip = ip self._queue_request(adapter) else: adapter.http_failed(_("Host name lookup failed")) def poll(self, timeout=0.1): proxy_config = Config.get_instance().proxy_config lookup_manager = LookupManager.get_instance() # activate up to MAX_CONNECTIONS channels while self._queue and len(asyncore.socket_map) < NetworkConstants.MAX_CONNECTIONS and not proxy_config.is_waiting: adapter = self._queue.pop(0) # has the user switched off the proxy after this request was queued if (not proxy_config.use) and (not adapter.request.ip): lookup_manager.lookup( adapter.request.host, self._request_resolved, adapter) else: if proxy_config.use: adapter.set_proxy((proxy_config.ip, proxy_config.port)) adapter.start() # keep the network running now = time.time() lookup_manager.poll(timeout) timeout -= (time.time() - now) if timeout > 0.0: asyncore.poll(timeout) # time out stuck consumers self.time_out_consumers() # return non-zero if we should keep polling return len(self._queue) or len(asyncore.socket_map) def time_out_consumers(self): now = time.time() for obj in asyncore.socket_map.values(): pc = obj.consumer if now - pc.start_time > NetworkConstants.MAX_DOWNLOAD_TIME: pc.time_exceeded() connection_manager = ConnectionManager() class ConsumerAdapter(object): CREATED = 0 STARTED = 1 CLOSED = 2 # TODO: we do nothing with the proxy? def __init__(self, req): self.connection = httplib_async.HTTPConnection_async(req.host, req.ip, req.port, self) self.consumer = req.consumer self._data = "" self.header = None self.status = None self._request = req self.finished = False self.start_time = None self.state = self.CREATED if req.user and req.password: req.headers['Authorization'] = 'Basic %s' % base64.encodestring('%s:%s' % (req.user, req.password)).strip() req.headers['Accept-encoding'] = 'gzip' req.headers['User-agent'] = 'Straw/%s' % constants.VERSION # interface used by ConnectionManager def start(self): if self.state != self.CREATED: return self.state = self.STARTED self.start_time = time.time() try: self.connection.request( "GET", self._request.path, self._request.headers) self.connection.execute() except Exception, ex: self._send_failed(ex) self._close_connection() self.state = self.CLOSED def set_proxy(self, proxy): self.connection.set_proxy(proxy) def time_exceeded(self): if self.state != self.STARTED: return self.state = self.CLOSED self._close_connection() self._send_failed(_("Maximum download time exceeded")) def get_request(self): return self._request request = property(get_request) def stop(self): if self.state != self.CLOSED: self.state = self.CLOSED self._close_connection() self.consumer.operation_stopped() # the following methods are the interface HTTPConnection_async uses def http_failed(self, exception): """Called by HTTPConnection_async when connection failed with exception. Also used by ConnectionManager if host name lookup failed. """ self._send_failed(exception) self._close_connection() self.state = self.CLOSED def http_header(self, status, header): """Called by HTTPConnection_async with status and header""" self.status = status self.header = header if header.getheader('content-length') == '0': self._send_results() self._close_connection() self.state = self.CLOSED def http_redirect(self, location, permanent = 0): """Called by HTTPConnection_async with the new location in case of 301 or 302""" assert type(location) == type(''), "Invalid redirect" if urlparse.urlparse(location)[0] != 'http': location = urlparse.urljoin(self.request.uri, location) if permanent: self.consumer.http_permanent_redirect(location) connection_manager.request( location, self.consumer, self.request.headers, self.request.user, self.request.password) self._close_connection() self.state = self.CLOSED def feed(self, data): """Called by HTTPConnection_async with (part of the) data, after http_header""" self._data += data datalength = len(self._data) cl = self.header.getheader('content-length') if cl is not None and datalength >= int(cl): #if datalength >= int(self.header.getheader('content-length', 0)): self._close_connection() self._send_results() self.state = self.CLOSED elif datalength >= NetworkConstants.MAX_DOWNLOAD_SIZE: self._close_connection() self._send_failed(_("Maximum download file size exceeded")) self.state = self.CLOSED def http_close(self): """Called by HTTPConsumer_async when the connection is closed""" if self.header and (len(self._data) < int(self.header.getheader('content-length', 0))): msg = _("Feed is empty.") self._send_failed(msg) else: self._send_results() self._close_connection() self.state = self.CLOSED # internal def _send_failed(self, data): if not self.finished: self.finished = True self.consumer.http_failed(data) def _send_results(self): if not self.finished: self.finished = True if self.header and self.header.getheader( 'content-encoding') == 'gzip': self._data = gzip.GzipFile( fileobj = StringIO(self._data)).read() self.consumer.http_results(self.status, self.header, self._data) def _close_connection(self): if hasattr(self, "connection"): try: self.connection.close() except: # silently ignore errors: if it didn't succeed, it wasn't # open in the first place. we don't care. pass del self.connection def __str__(self): host = "Unknown" path = "Unknown" if getattr(self, "request", None) is not None: host = self.request.host path = self.request.path return '<%s for %s %s>' % (repr(self), host, path)