""" URLFetch.py
Module for retrieving data from a URL.
"""
__copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc"
__license__ = """
Straw is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 2 of the License, or (at your option) any later
version.
Straw is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
Place - Suite 330, Boston, MA 02111-1307, USA. """
import httplib_async
import asyncore
import urlparse
import base64
import gzip
try:
from cStringIO import StringIO
except:
from StringIO import StringIO
import error
import time
import NetworkConstants
import Config
import LookupManager
import constants
def is_ip(host):
parts = host.split(".")
if len(parts) != 4:
return 0
for p in parts:
try:
np = int(p)
except (ValueError, TypeError):
return 0
if not 0 <= np < 256:
return 0
return 1
class RequestSchemeException(Exception):
def __init__(self, scheme):
Exception.__init__(self)
self.scheme = scheme
self.args = {}
class Request:
def __init__(self, host = None, port = None, path = None, ip = None,
headers = None, user = None, password = None, priority = None,
consumer = None, uri = None):
self.host = host
self.port = port
self.path = path
self.ip = ip
self.headers = headers
self.user = user
self.password = password
self.priority = priority
self.consumer = consumer
self.uri = uri
class Caller:
def __init__(self, f):
self._f = f
def __call__(self):
if not hasattr(self, "_f"): return
try:
self._f()
finally:
del self._f
class ConnectionManager:
def __init__(self):
self._queue = []
def request(self, uri, consumer, headers={}, user=None, password=None, priority=NetworkConstants.PRIORITY_DEFAULT):
uri = uri.strip()
scheme, host, path, params, query, fragment = urlparse.urlparse(uri)
if scheme != "http":
raise RequestSchemeException(scheme)
try:
host, port = host.split(":", 1)
port = int(port)
except (TypeError, ValueError):
port = 80 # default port
if not path:
path = "/"
if params:
path = path + ";" + params
if query:
path = path + "?" + query
req = Request(host = host, port = port, path = path, headers = headers,
user = user, password = password, priority = priority,
consumer = consumer, uri = uri)
pc = Config.get_instance().proxy_config
if pc.use:
# we don't set ConsumerAdapter's proxy here because we might not
# yet know its ip. However, we can set the Proxy-Authorization
# line
if pc.use_authentication:
req.headers['Proxy-Authorization'] = ('Basic %s' % base64.encodestring('%s:%s' % (pc.user, pc.password)).strip())
adapter = ConsumerAdapter(req)
# no lookup necessary if host is an ip address.
# if we are using a proxy, let it handle the lookup.
if is_ip(host):
adapter.request.ip = host
self._queue_request(adapter)
elif pc.use:
self._queue_request(adapter)
else:
try:
LookupManager.get_instance().lookup(
host, self._request_resolved, adapter)
except LookupManager.NameFormatException, e:
adapter.http_failed(e)
# don't return the categoryadapter object, because nothing in
# it except stop() should be called from other parts of Straw
return Caller(adapter.stop)
def _queue_request(self, adapter):
i = 0
while i < len(self._queue):
if self._queue[i].request.priority > adapter.request.priority:
self._queue.insert(i, adapter)
return
i += 1
# from outside while
self._queue.append(adapter)
def _request_resolved(self, host, ip, adapter):
if ip is not None and ip != "":
adapter.request.ip = ip
self._queue_request(adapter)
else:
adapter.http_failed(_("Host name lookup failed"))
def poll(self, timeout=0.1):
proxy_config = Config.get_instance().proxy_config
lookup_manager = LookupManager.get_instance()
# activate up to MAX_CONNECTIONS channels
while self._queue and len(asyncore.socket_map) < NetworkConstants.MAX_CONNECTIONS and not proxy_config.is_waiting:
adapter = self._queue.pop(0)
# has the user switched off the proxy after this request was queued
if (not proxy_config.use) and (not adapter.request.ip):
lookup_manager.lookup(
adapter.request.host, self._request_resolved, adapter)
else:
if proxy_config.use:
adapter.set_proxy((proxy_config.ip, proxy_config.port))
adapter.start()
# keep the network running
now = time.time()
lookup_manager.poll(timeout)
timeout -= (time.time() - now)
if timeout > 0.0:
asyncore.poll(timeout)
# time out stuck consumers
self.time_out_consumers()
# return non-zero if we should keep polling
return len(self._queue) or len(asyncore.socket_map)
def time_out_consumers(self):
now = time.time()
for obj in asyncore.socket_map.values():
pc = obj.consumer
if now - pc.start_time > NetworkConstants.MAX_DOWNLOAD_TIME:
pc.time_exceeded()
connection_manager = ConnectionManager()
class ConsumerAdapter(object):
CREATED = 0
STARTED = 1
CLOSED = 2
# TODO: we do nothing with the proxy?
def __init__(self, req):
self.connection = httplib_async.HTTPConnection_async(req.host, req.ip, req.port, self)
self.consumer = req.consumer
self._data = ""
self.header = None
self.status = None
self._request = req
self.finished = False
self.start_time = None
self.state = self.CREATED
if req.user and req.password:
req.headers['Authorization'] = 'Basic %s' % base64.encodestring('%s:%s' % (req.user, req.password)).strip()
req.headers['Accept-encoding'] = 'gzip'
req.headers['User-agent'] = 'Straw/%s' % constants.VERSION
# interface used by ConnectionManager
def start(self):
if self.state != self.CREATED:
return
self.state = self.STARTED
self.start_time = time.time()
try:
self.connection.request(
"GET", self._request.path, self._request.headers)
self.connection.execute()
except Exception, ex:
self._send_failed(ex)
self._close_connection()
self.state = self.CLOSED
def set_proxy(self, proxy):
self.connection.set_proxy(proxy)
def time_exceeded(self):
if self.state != self.STARTED:
return
self.state = self.CLOSED
self._close_connection()
self._send_failed(_("Maximum download time exceeded"))
def get_request(self):
return self._request
request = property(get_request)
def stop(self):
if self.state != self.CLOSED:
self.state = self.CLOSED
self._close_connection()
self.consumer.operation_stopped()
# the following methods are the interface HTTPConnection_async uses
def http_failed(self, exception):
"""Called by HTTPConnection_async when connection failed with
exception. Also used by ConnectionManager if host name lookup failed.
"""
self._send_failed(exception)
self._close_connection()
self.state = self.CLOSED
def http_header(self, status, header):
"""Called by HTTPConnection_async with status and header"""
self.status = status
self.header = header
if header.getheader('content-length') == '0':
self._send_results()
self._close_connection()
self.state = self.CLOSED
def http_redirect(self, location, permanent = 0):
"""Called by HTTPConnection_async with the new location in case of 301 or 302"""
assert type(location) == type(''), "Invalid redirect"
if urlparse.urlparse(location)[0] != 'http':
location = urlparse.urljoin(self.request.uri, location)
if permanent:
self.consumer.http_permanent_redirect(location)
connection_manager.request(
location, self.consumer, self.request.headers,
self.request.user, self.request.password)
self._close_connection()
self.state = self.CLOSED
def feed(self, data):
"""Called by HTTPConnection_async with (part of the) data, after http_header"""
self._data += data
datalength = len(self._data)
cl = self.header.getheader('content-length')
if cl is not None and datalength >= int(cl):
#if datalength >= int(self.header.getheader('content-length', 0)):
self._close_connection()
self._send_results()
self.state = self.CLOSED
elif datalength >= NetworkConstants.MAX_DOWNLOAD_SIZE:
self._close_connection()
self._send_failed(_("Maximum download file size exceeded"))
self.state = self.CLOSED
def http_close(self):
"""Called by HTTPConsumer_async when the connection is closed"""
if self.header and (len(self._data) <
int(self.header.getheader('content-length', 0))):
msg = _("Feed is empty.")
self._send_failed(msg)
else:
self._send_results()
self._close_connection()
self.state = self.CLOSED
# internal
def _send_failed(self, data):
if not self.finished:
self.finished = True
self.consumer.http_failed(data)
def _send_results(self):
if not self.finished:
self.finished = True
if self.header and self.header.getheader(
'content-encoding') == 'gzip':
self._data = gzip.GzipFile(
fileobj = StringIO(self._data)).read()
self.consumer.http_results(self.status, self.header, self._data)
def _close_connection(self):
if hasattr(self, "connection"):
try:
self.connection.close()
except:
# silently ignore errors: if it didn't succeed, it wasn't
# open in the first place. we don't care.
pass
del self.connection
def __str__(self):
host = "Unknown"
path = "Unknown"
if getattr(self, "request", None) is not None:
host = self.request.host
path = self.request.path
return '<%s for %s %s>' % (repr(self), host, path)
syntax highlighted by Code2HTML, v. 0.9.1