""" PollManager.py Module for polling the feeds. """ __copyright__ = "Copyright (c) 2002-4 Juri Pakaste " __license__ = """ GNU General Public License Straw is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Straw is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ import Config import FeedList import Event from MainloopManager import MainloopManager import MessageManager import NetworkConstants import URLFetch import Feed import FeedCategoryList import SummaryParser import utils import time import socket import error class PollStopper(object, Event.SignalEmitter): def __init__(self, stopper, object): Event.SignalEmitter.__init__(self) self.initialize_slots(Event.PollingStoppedSignal) self._stopper = stopper self._object = object stopper = property(lambda self: self._stopper) object = property(lambda self: self._object) def stop(self): if self._stopper is not None: self._stopper() self._stopper = None self.emit_signal(Event.PollingStoppedSignal(self)) class PollManager: def __init__(self): self._config = Config.get_instance() self._mmgr = MessageManager.get_instance() self._feedlist = FeedList.get_instance() self._config.signal_connect(Event.PollFrequencyChangedSignal, self.poll_changed) self._config.signal_connect(Event.OfflineModeChangedSignal, self.offline_changed) self._feedlist.signal_connect(Event.FeedsChangedSignal, self.feeds_changed) self._is_offline = self._config.offline self._poll_frequency = self._config.poll_frequency self._last_poll = self._config.last_poll self._feeds = self._feedlist.flatten_list() self._pollers = {} def poll_changed(self, signal): """ called when global poll frequency has changed""" self._poll_frequency = signal.value def offline_changed(self, signal): self._is_offline = self._config.offline def feeds_changed(self, *args): self._feedlist = FeedList.get_instance() self._feeds = self._feedlist.flatten_list() def start_polling_loop(self): mlmgr = MainloopManager.get_instance() mlmgr.set_repeating_timer(NetworkConstants.POLL_INTERVAL, self.maybe_poll) ########################## # Why both pollcontext and self._pollers? # self._pollers is there to make sure that there are no simultaneous # pollers on the same object. So the user doesn't accidentally start # a poller for an object being polled. # pollcontext is there so that we can avoid starting pollers many # times - even though they wouldn't overlap - on the same object within # one polling run. # because maybe_poll is used as an idle handler, it must always return # True or else it's removed from the handler list def maybe_poll(self): self._maybe_poll() def _maybe_poll(self): now = int(time.time()) if self._is_offline: return True try: context = {} config_pf = self._config.poll_frequency feed_use_default = Feed.Feed.DEFAULT self.poll([feed for feed, timediff, fpf in [(feed, now - feed.last_poll, feed.poll_frequency) for feed in self._feeds] if ((timediff > fpf > 0) or (fpf == feed_use_default and timediff > config_pf > 0))], context) self.poll_categories([cat for cat, sub, timediff in [(cat, cat.subscription, now - cat.subscription.last_poll) for cat in FeedCategoryList.get_instance().all_categories if cat.subscription is not None] if ((timediff > sub.frequency > 0) or (sub.frequency == sub.REFRESH_DEFAULT and timediff > config_pf > 0))], context, False) except: error.log_exc("Caught an exception while polling") URLFetch.connection_manager.poll(NetworkConstants.POLL_TIMEOUT) return True def poll(self, obj, pollcontext = None): """ obj must be a list of Feed objects""" if pollcontext is None: pollcontext = {} stoppers = [] for f in obj: sf = None if not self._pollers.has_key(f) and not pollcontext.has_key(f): self._pollers[f] = True pollcontext[f] = True sf = FeedPoller(f).poll() stoppers.append(sf) return stoppers def poll_categories(self, cats, pollcontext = None, feeds = True): """Polls categories and, if feeds is True, the feeds associated with them. Returns a list of list containing PollStopper objects. poll_categories([c1, c2, c3]) -> [[sc1, sc1f1, sc1f2 ...], [sc2, sc2f1, sc2f2 ...], ...] where cN is a category, scN is a PollStopper for cN, scNfM is a stopper for a feed in cN. If no polling was started for that cN or cNfM, in its place is None. """ stoppers = [] if pollcontext is None: pollcontext = {} for c in cats: category_stoppers = [] sc = None if c.subscription is not None and not self._pollers.has_key(c) and not pollcontext.has_key(c): self._pollers[c] = True pollcontext[c] = True sc = CategoryPoller(c, pollcontext).poll() category_stoppers.append(sc) if feeds: for f in c.feeds: sf = None if not self._pollers.has_key(f) and not pollcontext.has_key(f): self._pollers[f] = True pollcontext[f] = True sf = FeedPoller(f).poll(), f category_stoppers.append(sf) stoppers.append(category_stoppers) return stoppers def poll_done(self, obj): if self._pollers.has_key(obj): del self._pollers[obj] class FeedPoller: def __init__(self, feed): self._feed = feed self._mmgr = MessageManager.get_instance() def poll(self): MainloopManager.get_instance().call_pending() url, user, pw = self._feed.access_info parsed = None headers = {} config = Config.get_instance() if not config.no_etags and self._feed.previous_etag is not None: headers['If-None-Match'] = self._feed.previous_etag self._feed.last_poll = int (time.time()) ps = None try: try: stopper = URLFetch.connection_manager.request( url, self, headers, user, pw, priority=NetworkConstants.PRIORITY_RSS) ps = PollStopper(stopper, self._feed) except Exception, e: error.log_exc("Caught an exception while polling") self.http_failed(e) else: self._feed.router.start_polling() self._feed.router.stopper = ps finally: return ps def http_results(self, status, header, data): MainloopManager.get_instance().call_pending() try: try: import pdb #pdb.set_trace() self._feed.poll_done() err = "" read_data = True if status is None: err = _("No data") elif status[1] == 304: self._feed.router.route_no_data() read_data = False elif status[1] == 410 or status[1] == 404: err = _("Unable to find the feed (%s: %s)") % ( status[1], status[2].strip()) elif status[1] == 401: err = _("Invalid username and password.") elif status[1] > 299: err = _("Updating feed resulted in abnormal status '%s' (code %d)") % ( status[2].strip(), status[1]) if err: self._feed.router.set_error(err) elif read_data: try: parsed = SummaryParser.parse(data, self._feed) except Exception, e: error.log_exc("exception in summaryparser") self._feed.router.set_error( _("An error occurred while processing feed: %s") % str(e)) else: self._feed.router.route_all(header, parsed) self._mmgr.post_message(_("Updating %s done.") % self._feed.title) except Exception, ex: error.log_exc("error while parsing results") self._feed.router.set_error(str(ex)) finally: get_instance().poll_done(self._feed) def http_failed(self, exception): try: if isinstance(exception, socket.error): self._feed.router.route_no_data() else: self._feed.router.set_error(str(exception)) self._mmgr.post_message(_("Updating %s failed") % self._feed.title) self._feed.poll_done() finally: get_instance().poll_done(self._feed) def http_permanent_redirect(self, location): (oldloc, u, p) = self._feed.access_info self._feed.access_info = (location, u, p) def operation_stopped(self): self._feed.router.route_no_data() self._feed.poll_done() get_instance().poll_done(self._feed) class CategoryPoller: def __init__(self, category, pollcontext): self._category = category self._mmgr = MessageManager.get_instance() self._pollcontext = pollcontext def poll(self): headers = {} sub = self._category.subscription if sub is None or sub.location is None or len(sub.location) == 0: return None sub.last_poll = int(time.time()) if sub.previous_etag is not None: headers['If-None-Match'] = sub.previous_etag ps = None try: try: stopper = URLFetch.connection_manager.request( sub.location, self, headers, sub.username, sub.password, priority=NetworkConstants.PRIORITY_RSS) ps = PollStopper(stopper, self._category) except Exception, e: error.log_exc("Caught an exception while polling category") finally: return ps def http_results(self, status, header, data): sub = self._category.subscription try: try: err = None if status is None: err = _("No data (%s)" % status[1]) elif status[1] == 410 or status[1] == 404: err = _("Unable to find the category (%s: %s)") % ( status[1], status[2].strip()) elif status[1] == 401: err = _("Invalid username and password.") elif status[1] > 299: err = _("Updating category resulted in abnormal status '%s' (code %d)") % (status[2].strip(), status[1]) if err is not None: self._category.subscription.error = err else: sub.parse(data) old_feeds = self._category.feeds self._category.read_contents_from_subscription() common, inold, innew = utils.listdiff( old_feeds, self._category.feeds) if len(innew) > 0: get_instance().poll(innew, self._pollcontext) except Exception, e: self._category.subscription.error = str(e) finally: get_instance().poll_done(self._category) def http_failed(self, exception): self._category.subscription.error = str(exception) get_instance().poll_done(self._category) def http_permanent_redirect(self, location): error.logparam(locals(), "location") def operation_stopped(self): get_instance().poll_done(self._category) pollmanager_instance = None def get_instance(): global pollmanager_instance if pollmanager_instance is None: pollmanager_instance = PollManager() return pollmanager_instance