""" subscribe.py
Module for handling feed subscription process.
"""
__copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
__license__ = """ GNU General Public License
This program is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 2 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc., 59 Temple
Place - Suite 330, Boston, MA 02111-1307, USA. """
import urlparse
import urllib
from xml.sax import saxutils
import pygtk
pygtk.require('2.0')
import gobject
import gtk
import gtk.glade
import gettext
from Feed import Feed
import FeedList
import feedfinder
import error
import Event
import utils
import SummaryParser
import ParsedSummary
import dialogs
import time
import URLFetch
import NetworkConstants
import MVP
import Config
class FeedLocationView(MVP.GladeView):
def _initialize(self):
self._window = self._widget.get_widget("subscribe_window")
self._notebook = self._widget.get_widget("subscribe_notebook")
self._entry_url = self._widget.get_widget("subscribe_location_entry")
self._entry_url.grab_focus()
def _on_subscribe_location_entry_key_press_event(self, widget, event):
if event.keyval == gtk.keysyms.Return:
self.find_feed(self._entry_url.get_text())
def _on_main_ok_clicked(self, *args):
self.find_feed(self._entry_url.get_text())
def _on_main_cancel_clicked(self, *args):
self._window.hide()
def find_feed(self, url):
config = Config.get_instance()
if not url:
self._window.hide()
dialogs.report_error(_("Feed URL Not Provided"),
_("Please provide the URL of the feed you are trying to subscribe to."),
parent=self._window)
self._window.present()
self._notebook.set_current_page(Page.LOCATION)
return
elif config.offline:
self._window.hide()
response = dialogs.report_offline_status(self._window)
if response == gtk.RESPONSE_CANCEL:
self._notebook.set_current_page(Page.LOCATION)
return
config.offline = not config.offline
self._window.present()
try:
url, uname, pword = self._presenter.split_url(url)
except TypeError: # bad feed url
dialogs.report_error(_("Bad Feed URL"),
_("Straw wasn't able to parse '%s'")%url,
parent=self._window)
return
self._presenter.find_feed(url, uname, pword)
def report_error(self, scheme):
dialogs.report_error(_("Unsupported Scheme"),
_("Subscribing to '%s://' is not supported") % scheme,
parent=self._window)
def clear_entry(self):
self._entry_url.delete_text(0, -1)
def set_entry_url(self, url):
self._entry_url.set_text(url)
class FeedLocationPresenter(MVP.BasicPresenter):
def _initialize(self):
self._domain = ""
self._scheme = None
def _fix_url(self, url):
u = urlparse.urlsplit(url.strip())
# we check if 'scheme' is not empty here because URIs like
# "google.com" IS valid but, in this case, 'scheme' is empty because
# urlsplit() expects urls are in the format of scheme://netloc/...
if not u[0] or (u[0] != "http" and u[0] != "feed"):
return None
if u[0] == 'feed':
u = urlparse.urlsplit(u[2])
# .. if that happens then we munge the url by adding a // and default
# to 'http' as the scheme.
if u[1] == '':
u = urlparse.urlsplit("//" + url, 'http')
return u
def split_url(self, url):
username, password = "", ""
scheme, loc, path, query, fragment = self._fix_url(url)
auth_tokens = loc.split('@')
if len(auth_tokens) > 1:
username, password = auth_tokens[0].split(':')
loc = auth_tokens[1]
self._domain = loc
self._scheme = scheme
url = urlparse.urlunsplit((scheme, loc, path, query, fragment))
return (url, username, password)
def find_feed(self, url, username="", password=""):
if self._scheme and self._scheme != 'http':
self._view.report_error(self._scheme)
self._model.poll(url, username, password)
def get_feed_domain(self):
return self._domain
def clear_entry(self):
self.view.clear_entry()
def set_location(self):
def _clipboard_cb(cboard, text, data=None):
if text:
b = self._fix_url(text)
if b and b[0] == "http":
u = urlparse.urlunsplit(b)
self.view.set_entry_url(u)
clipboard = gtk.clipboard_get(selection="CLIPBOARD")
clipboard.request_text(_clipboard_cb, None)
class AuthView(MVP.GladeView):
def __init__(self, widget, url):
MVP.GladeView.__init__(self, widget)
self._url = url
def _initialize(self):
self._username_entry = self._widget.get_widget("subscribe_username_entry")
self._password_entry = self._widget.get_widget("subscribe_password_entry")
self._username_entry.grab_focus()
def _on_auth_ok_clicked(self, *args):
username = self._username_entry.get_text()
password = self._password_entry.get_text()
self._presenter.find_feed(self._url, username, password)
class AuthPresenter(FeedLocationPresenter):
def find_feed(self, url, username, password):
self._model.poll(url, username, password)
class FeedsTreeView(MVP.GladeView):
def _initialize(self):
self._treeview = self._widget.get_widget("feeds_treeview")
self._toggle_renderer = gtk.CellRendererToggle()
column = gtk.TreeViewColumn(_('Subscribe'), self._toggle_renderer,
active=FeedColumn.STATUS_FLAG)
self._treeview.append_column(column)
renderer = gtk.CellRendererText()
column = gtk.TreeViewColumn(_('Title'), renderer,
markup=FeedColumn.TITLE)
self._treeview.append_column(column)
def get_toggle_renderer(self):
return self._toggle_renderer
def get_widget(self, name):
return self._widget.get_widget(name)
def _model_set(self):
self._treeview.set_model(self._model)
class FeedsPresenter(MVP.BasicPresenter):
def _initialize(self):
self._channel_parser_func = SummaryParser.parse_channel_info
self._ok_button = self._view.get_widget("mf_ok")
self._ok_button.set_sensitive(False)
toggle_renderer = self._view.get_toggle_renderer()
toggle_renderer.connect("toggled", self._feed_toggled)
def _feed_toggled(self, cell, path):
self._model.toggle_feed(cell, path)
if self._model.get_selected_feeds_count():
self._ok_button.set_sensitive(True)
else:
self._ok_button.set_sensitive(False)
def set_parser_func(self, func):
self._channel_parser_func = func
def init_model(self, feeds):
self._model.clear()
for url, data in feeds.items():
ps = ParsedSummary.ParsedSummary()
parsed_content, encoding = SummaryParser.feedparser_parse(data)
if parsed_content.version == "": # feed is neither rss nor atom
continue
ps = self._channel_parser_func(ps, parsed_content, encoding)
self._model.add_feed(url, ps)
class FeedsModel(gtk.ListStore):
def __init__(self):
gtk.ListStore.__init__(self, gobject.TYPE_OBJECT, gobject.TYPE_STRING,
gobject.TYPE_PYOBJECT,
gobject.TYPE_BOOLEAN, gobject.TYPE_STRING)
self._selected_feeds = list()
def add_feed(self, url, parsed):
info = "<b>%s</b>\n%s" % (saxutils.escape(parsed.title),
saxutils.escape(parsed.description))
title = "%s\n%s" % (info.strip(),
saxutils.escape(url))
iterator = self.append()
self.set(iterator, FeedColumn.TITLE, title,
FeedColumn.PARSEDSUMMARY, parsed,
FeedColumn.STATUS_FLAG, False, # Items not ticked by default
FeedColumn.URL, url)
def toggle_feed(self, cell, path):
iterator = self.get_iter((int(path),))
url = self.get_value(iterator, FeedColumn.URL)
sticky_value = not self.get_value(iterator, FeedColumn.STATUS_FLAG)
if sticky_value:
self._selected_feeds.append(url)
else:
self._selected_feeds.remove(url)
self.set(iterator, FeedColumn.STATUS_FLAG, sticky_value)
def get_selected_feeds(self):
feeds = self._selected_feeds[:]
del self._selected_feeds[:]
return feeds
def get_selected_feeds_count(self):
return len(self._selected_feeds)
class Subscribe:
def __init__(self, xml, parent):
self._widget = xml
self._stoppers = list()
self._poller = Poller()
self._poller.signal_connect(PollStartSignal, self.poll_start_cb)
self._poller.signal_connect(AuthNeededSignal, self.auth_needed_cb)
self._poller.signal_connect(PollDoneSignal, self.poll_done_cb)
self._poller.signal_connect(PollFailedSignal, self.poll_failed_cb)
self._feed_location_presenter = FeedLocationPresenter(
model= self._poller, view = FeedLocationView(xml))
self._feeds_selection_presenter = FeedsPresenter(
model = FeedsModel(), view = FeedsTreeView(self._widget))
self._window = self._widget.get_widget("subscribe_window")
self._window.set_transient_for(parent)
self._window.connect('delete-event', self.on_subscribe_window_delete_event)
self._notebook = self._widget.get_widget("subscribe_notebook")
self._progress_bar = self._widget.get_widget("subscribe_progress_bar")
self._progress_label = self._widget.get_widget("subscribe_progress_label")
self._widget.get_widget("progress_cancel").connect("clicked",
self.close_window)
self._widget.get_widget("main_cancel").connect("clicked",
self.close_window)
self._widget.get_widget("auth_cancel").connect("clicked",
self.close_window)
self._widget.get_widget("mf_cancel").connect("clicked",
self.close_window)
self._widget.get_widget("mf_ok").connect("clicked", self.feed_selection_done)
self._feeds = dict()
self._username = ""
self._password = ""
self._update_progress = False
def _get_poller(self):
return self._poller
poller = property(_get_poller)
def show(self):
self._notebook.set_current_page(Page.LOCATION)
self._feed_location_presenter.set_location()
self._window.show()
def hide(self):
self._window.hide()
def on_subscribe_window_delete_event(self, *args):
self.close_window()
return gtk.TRUE
def set_url_dnd(self, url):
self._window.show()
self._feed_location_presenter.view.find_feed(url)
def _reset(self):
self._notebook.set_current_page(Page.LOCATION)
self._feeds.clear()
self._feed_location_presenter.clear_entry()
self._update_progress = False
def close_window(self, *args):
self._reset()
self.remove_stoppers()
self.hide()
def remove_stoppers(self):
[stopper() for stopper in self._stoppers if stopper is not None]
del self._stoppers[:]
def poll_start_cb(self, event):
self._stoppers.append(event.stopper)
self._username, self._password = event.username, event.password
domain = self._feed_location_presenter.get_feed_domain()
if self._notebook.get_current_page() != Page.PROGRESS:
self._window.set_title(_("Searching %s" % domain))
self._notebook.set_current_page(Page.PROGRESS)
if not self._update_progress:
self._start_progress_bar()
self.progress_set_text(_("Searching %s. This may take a while...") % domain)
def _start_progress_bar(self):
self._update_progress = True
self._progress_bar.set_fraction(0)
def _update_progress_bar():
self._progress_bar.pulse()
if self._update_progress:
return True
else: return False
update = gobject.timeout_add(150, _update_progress_bar)
def progress_set_text(self, text):
self._progress_label.set_text("%s" % text)
def auth_needed_cb(self, event):
self._notebook.set_current_page(Page.AUTH)
AuthPresenter(view = AuthView(self._widget, event.url),
model = self._poller)
def poll_done_cb(self, event):
self.remove_stoppers()
try:
feeds, data, data_is_feed = feed_data_check(event.url, event.data)
except IOError, ioe:
num_feeds = 0;
self._display_dialog(_("Unexpected Error Occurred"), str(ioe))
else:
num_feeds = len(feeds)
if num_feeds == 1:
self._process_feed(feeds, data, data_is_feed)
elif num_feeds > 1:
self._process_multi_feed(feeds)
else:
self._display_dialog(_("Unable to Find Feeds"),
(_("Straw was unable to find feeds in %s") % event.url))
def _process_feed(self, feeds, data, data_is_feed):
feed_url = feeds.pop()
if data_is_feed:
self.add_to_subscription(feed_url, data)
self.close_window()
else:
self._poller.poll(feed_url, self._username, self._password)
return
def _process_multi_feed(self, feeds):
num_feeds = len(feeds)
for url in feeds:
self._feeds[url] = None
self._poller.signal_disconnect(PollDoneSignal, self.poll_done_cb)
unpolled = 1
def mf_cb(event):
if not feeds:
return
self._feeds[feeds.pop()] = event.data
unpolled = len([v for v in self._feeds.itervalues() if v is None])
self.progress_set_text(_("Processing %d of %d feeds") % (num_feeds-unpolled,num_feeds))
if unpolled == 0:
self.remove_stoppers()
self._prepare_feed_selection_page()
self._poller.signal_connect(PollDoneSignal, mf_cb)
for feed in feeds:
self._poller.poll(feed, self._username, self._password)
return
def _prepare_feed_selection_page(self):
self._notebook.set_current_page(Page.FEEDS)
self._feeds_selection_presenter.init_model(self._feeds)
def poll_failed_cb(self, event):
text = ""
self._update_progress = False
if event.exception is not None:
text = str(event.exception)
elif event.message is not None:
text = event.message
self._display_dialog(_("Error While Subscribing"), text)
def _display_dialog(self, title, text):
self._window.hide()
response = dialogs.report_error(_("%s" % title),
_("%s" % text),
parent=self._window)
self._window.show()
if response:
self.remove_stoppers()
self._notebook.set_current_page(Page.LOCATION)
def feed_selection_done(self, event):
sf = self._feeds_selection_presenter.model.get_selected_feeds()
for url in sf:
if url in self._feeds:
self.add_to_subscription(url, self._feeds[url])
self.close_window()
def add_to_subscription(self, url, data):
feed = Feed.create_new_feed("", url, self._username, self._password)
parsed = SummaryParser.parse(data, feed)
if parsed.title == "":
# store url loc in title in case it's empty
parsed.title = urlparse.urlsplit(url.strip())[1]
feed.title = utils.convert_entities(parsed.title)
feed.channel_description = utils.convert_entities(parsed.description)
feed.access_info = (url, self._username, self._password)
feed.last_poll = int(time.time())
category = None # Add to 'All' category
FeedList.get_instance().append(category, feed)
feed.router.route_all(None, parsed)
feed.poll_done()
class PollStartSignal(Event.BaseSignal):
def __init__(self, sender, stopper, url, username, password):
Event.BaseSignal.__init__(self,sender)
self.stopper = stopper
self.url = url
self.username = username
self.password = password
class PollDoneSignal(Event.BaseSignal):
def __init__(self, sender, url, data):
Event.BaseSignal.__init__(self,sender)
self.data = data
self.url = url
class AuthNeededSignal(Event.BaseSignal):
def __init__(self, sender, url):
Event.BaseSignal.__init__(self, sender)
self.url = url
class PollFailedSignal(Event.BaseSignal):
def __init__(self, sender, url, exception=None, message=None):
Event.BaseSignal.__init__(self, sender)
self.url = url
self.exception = exception
self.message = message
class Poller(Event.SignalEmitter):
def __init__(self):
Event.SignalEmitter.__init__(self)
self.initialize_slots(PollStartSignal,
AuthNeededSignal,
PollDoneSignal,
PollFailedSignal)
self._request_handler = URLFetch.connection_manager
self._request_priority = NetworkConstants.PRIORITY_RSS
self._url = None
self._username = None
self._password = None
def set_request_handler(self, handler):
# currently for testing purposes
self._request_handler = handler
def set_request_priority(self, priority):
self._request_priority = priority
def poll(self, url, username="", password=""):
url = url.strip()
self._url = feedfinder.makeFullURI(url)
self._username = username
self._password = password
headers = {}
stopper = None
try:
stopper = self._request_handler.request(self._url, self,
headers, self._username, self._password,
priority=self._request_priority)
except Exception, e:
self.http_failed(e)
else:
self.emit_signal(PollStartSignal(self, stopper, self._url, self._username, self._password))
def http_results(self, status, header, data):
error = None
if status is None:
error = _("No Data")
else:
code = status[1]
if code == 401:
self.emit_signal(AuthNeededSignal(self, self._url))
return
elif code >= 402:
error = "%s: %s" % (code, status[2].strip())
if not error:
self.emit_signal(PollDoneSignal(self, self._url, data))
else:
self.http_failed(msg=error)
return
def http_failed(self, exception=None, msg=None):
self.emit_signal(PollFailedSignal(self, self._url, exception, msg))
def http_permanent_redirect(self, newloc):
self._url = newloc
self.poll(self._url, self._username, self._password)
def operation_stopped(self):
pass
class Page:
LOCATION = 0
PROGRESS = 1
AUTH = 2
FEEDS = 3
class FeedColumn:
SUBSCRIBE = 0
TITLE = 1
PARSEDSUMMARY = 2
STATUS_FLAG = 3
URL = 4
def feed_data_check(url, data):
feeds = list()
data_is_feed = False
if feedfinder.couldBeFeedData(data):
feeds.append(url)
data_is_feed = True
else:
feeds = feedfinder.getLinks(data, url)
feeds = filter(feedfinder.isFeed, feeds)
if not feeds:
links = feedfinder.getALinks(data, url)
locallinks = feedfinder.getLocalLinks(links, url)
# XXX: feedfinder.isfeed() blocks.
feeds = filter(feedfinder.isFeed, filter(feedfinder.isFeedLink, locallinks))
if not feeds:
feeds = filter(feedfinder.isFeed, filter(feedfinder.isXMLRelatedLink, locallinks))
if not feeds:
feeds = filter(feedfinder.isFeed, filter(feedfinder.isFeedLink, links))
if not feeds:
feeds = filter(feedfinder.isFeed,
filter(feedfinder.isXMLRelatedLink, links))
return (uniq(feeds), data, data_is_feed)
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52560
# Author: Raymond Hettinger
def uniq(alist): # Fastest without order preserving
set = {}
map(set.__setitem__, alist, [])
return set.keys()
instance = None
def _get_instance(parent=None):
global instance
if instance is None:
glade_file = utils.find_glade_file()
xml = gtk.glade.XML(glade_file, "subscribe_window",
gettext.textdomain())
instance = Subscribe(xml, parent)
return instance
def show(parent=None, url=None):
sd = _get_instance(parent)
if url is None:
sd.show()
else:
sd.set_url_dnd(url)
syntax highlighted by Code2HTML, v. 0.9.1