""" ItemStore.py Data store abstraction module. """ __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc." __license__ = """ Straw is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Straw is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ import cPickle as pickle import os, sys import time from error import * import tempfile import traceback try: from bsddb.db import * import bsddb except ImportError: from bsddb3.db import * import bsddb3 as bsddb import Event import SummaryItem import FeedList import ImageCache from MainloopManager import MainloopManager DATABASE_FILE_NAME = "itemstore.db" class ConvertException(Exception): def __init__(self, version1, version2, reason): self.version1 = version1 self.version2 = version2 self.reason = reason class MyDB: CURRENT_VERSION = 3 def __init__(self, filename, dbhome, create=0, truncate=0, mode=0600, recover=0, dbflags=0): self._db = None self._env = None recoverenv = DB_CREATE | DB_RECOVER # DB_INIT_TXN automatically enables logging flagsforenv = DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE self._env = DBEnv() self._env.set_data_dir(dbhome) self._env.set_lk_detect(DB_LOCK_DEFAULT) # enable auto deadlock avoidance self._env.set_lg_max(2**20) self._env.set_lk_max_locks (10000) self._env.set_lk_max_objects (10000) try: self._env.open(dbhome, recoverenv | flagsforenv, mode) except bsddb._db.DBRunRecoveryError, err: self._env.remove(dbhome) self._env.close() log("%s" % err[1]) sys.exit("Recovery Error: See README for details on how to recover data. ") flags = 0 if truncate: flags |= DB_TRUNCATE try: flags |= DB_AUTO_COMMIT except NameError: pass try: self._db = DB(self._env) self._db.open(filename, DB_BTREE, flags, mode) except bsddb._db.DBNoSuchFileError: if create: self._db = DB(self._env) self._db.open(filename, DB_BTREE, flags | DB_CREATE, mode) self.set_db_version(self.CURRENT_VERSION) else: raise try: self.convert_old_versions() except Exception, ex: try: filename = tempfile.mktemp(prefix="straw-") fh = open(filename, "w") traceback.print_exc(None, fh) raise ConvertException(self.get_db_version(), MyDB.CURRENT_VERSION, "%s" % filename) finally: fh.close() def close(self): if self._db is not None: self._db.close() self._db = None if self._env is not None: self._env.close() self._env = None def checkpoint(self): # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError) cpflags = 0 self._env.txn_checkpoint(cpflags | DB_FORCE ) deletees = self._env.log_archive(DB_ARCH_ABS) for d in deletees: os.remove(d) def begin_transaction(self): return self._env.txn_begin() def get_item_ids(self, iid, txn): key = "fids:%d" % iid dids = self._db.get(key, txn=txn) ids = [] if dids: ids = pickle.loads(dids) return ids def save_feed_item_ids(self, feed, ids, txn=None): rowid = "fids:%d" % feed.id commit = 0 if not txn: txn = self.begin_transaction() commit = 1 try: try: self._db.delete(rowid, txn=txn) except DBNotFoundError: pass self._db.put(rowid, pickle.dumps(ids), txn=txn) except Exception, ex: if commit: txn.abort() logtb(str(ex)) else: if commit: txn.commit() def get_item(self, feed_id, item_id, txn=None): item = self._db.get("%d:%d" % (feed_id, item_id), txn=txn) return unstringify_item(item) def add_items(self, feed, items): txn = self.begin_transaction() try: feed_item_ids = self.get_item_ids(feed.id, txn=txn) for item in items: self._db.put("%d:%d" % (item.feed.id, item.id), stringify_item(item), txn=txn) # TODO: it might be a good idea to check here that we don't add # duplicate items. It doesn't happen normally, but there can be # bugs that trigger that. Throwing an exception would be the # the right thing: it wouldn't hide the breakage. feed_item_ids.append(item.id) self.save_feed_item_ids(feed, feed_item_ids, txn) except Exception, ex: txn.abort() logtb(str(ex)) else: txn.commit() def delete_items(self, feed, items): """ Deletes a list of items. Useful for cutting old items based on number of items stored. """ txn = self.begin_transaction() try: feed_item_ids = self.get_item_ids(feed.id, txn=txn) # because of bugs, we sometime get here duplicate ids. instead of dying, # warn the user but continue item_ids = [] for item in items: item.clean_up() if item.id in item_ids: log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item.title, item.id)) # filter out any duplicates feed_item_ids = filter(lambda x: x != item.id, feed_item_ids) continue item_ids.append(item.id) #log("deleting item %d:%d" % (feed.id, item.id)) if item.id in feed_item_ids: feed_item_ids.remove(item.id) self._db.delete("%d:%d" % (feed.id, item.id), txn=txn) self.save_feed_item_ids(feed, feed_item_ids, txn) except Exception, ex: txn.abort() log_exc("error while deleting items") else: txn.commit() def modify_items(self, items): txn = self.begin_transaction() try: for item in items: self._db.put("%d:%d" % (item.feed.id, item.id), stringify_item(item), txn=txn) except Exception, ex: txn.abort() logtb(str(ex)) else: txn.commit() def get_feed_items(self, feed): txn = self.begin_transaction() items = [] try: ids = self.get_item_ids(feed.id, txn=txn) for id in ids: item = self.get_item(feed.id, id, txn=txn) if item is not None: items.append(item) except Exception, ex: txn.abort() log(str(ex)) else: txn.commit() return items def get_number_of_unread(self, fid, cutoff): # Used by config conversion # NOTE: this is the number of unread items in 'number of items stored' # preference. Since straw stores the most recent items down the list, # we only count the unread items from the most recent N items, # where N = cutoff. txn = self.begin_transaction() num_unread = 0 try: ids = self.get_item_ids(fid, txn=txn) for id in ids[len(ids)-cutoff:]: item = self.get_item(fid, id, txn=txn) if item is not None and item.seen == 0: num_unread += 1 else: continue except Exception, ex: txn.abort() logtb(str(ex)) else: txn.commit() return num_unread def get_image_urls(self, txn=None): dkeys = self._db.get("images", txn=txn) keys = [] if dkeys is not None: keys = pickle.loads(dkeys) return keys def save_image_urls(self, urls, txn=None): self._db.put("images", pickle.dumps(urls), txn=txn) def get_image_counts(self, txn=None): images = self.get_image_urls(txn) counts = [] for image in images: key = ("imagecount:" + image).encode('utf-8') value = self._db.get(str(key)) try: counts.append((image, int(value))) except: log("exception for ", key, ", type of value ", value, ": ", type(value)) return counts def update_image_count(self, url, count): #logparam(locals(), "url", "count") key = ("imagecount:" + url).encode('utf-8') txn = self.begin_transaction() try: if count < 1: self._db.delete(key, txn=txn) else: self._db.put(key, str(count), txn=txn) except: txn.abort() raise else: txn.commit() def update_image(self, url, image): key = "image:%s" % str(url) txn = self.begin_transaction() try: image_urls = self.get_image_urls(txn) if image is not None: self._db.put(key.encode('utf-8'), image, txn=txn) if url not in image_urls: image_urls.append(url) self.save_image_urls(image_urls, txn) else: if url in image_urls: try: self._db.delete(key, txn=txn) except DBNotFoundError: log("Key not found", key) image_urls.remove(url) self.save_image_urls(image_urls, txn=txn) except: txn.abort() raise else: txn.commit() def get_image_data(self, url, txn=None): return self._db.get( "image:%s" % url.encode('utf-8'), default = None, txn=txn) def _image_print(self, key, data): if key[:6] == "image:": print key def _data_print(self, key, data): data = pickle.loads(data) pprint ({key: data}) def _db_print(self, helper): """Print the database to stdout for debugging""" print "******** Printing raw database for debugging ********" print "database version: %s" % self.get_db_version() cur = self._db.cursor() try: key, data = cur.first() while 1 : helper(key, data) next = cur.next() if next: key, data = next finally: cur.close() def get_db_version(self, txn=None): version = self._db.get("straw_db_version", default = "1", txn=txn) return int(version) def set_db_version(self, version, txn=None): try: if txn is None: txn = self.begin_transaction() self._db.put("straw_db_version", str(version), txn=txn) except: txn.abort() raise else: txn.commit() def convert_old_versions(self): version = self.get_db_version() while version < self.CURRENT_VERSION: next = version + 1 mname = "convert_%d_%d" % (version, next) try: method = getattr(self, mname) except AttributeError: raise ConvertException(version, next, "No conversion function specified") method() self.set_db_version(next) version = next def convert_1_2(self): def is_item(key): parts = key.split(':') if len(parts) != 2: return False return parts[0].isdigit() and parts[1].isdigit() def round_second(ttuple): l = list(ttuple) l[5] = int(round(l[5])) return tuple(l) try: import mx.DateTime as mxd except ImportError: raise ConvertException(1, 2, _("Couldn't import mx.DateTime")) txn = self.begin_transaction() try: cur = self._db.cursor(txn=txn) try: next = cur.first() key = None if next: key, data = cur.first() while key is not None: if is_item(key): dict = pickle.loads(data) if isinstance(dict['pub_date'], mxd.DateTimeType): p = dict['pub_date'] t = time.gmtime(time.mktime(round_second(p.tuple()))) dict['pub_date'] = t data = pickle.dumps(dict) cur.put(key, data, DB_CURRENT) next = cur.next() if next: key, data = next else: break finally: cur.close() except Exception, ex: txn.abort() raise else: txn.commit() def convert_2_3(self): def is_item(key): parts = key.split(':') if len(parts) != 2: return False return parts[0].isdigit() and parts[1].isdigit() imagelistcursor = None images = {} txn = self.begin_transaction() try: cur = self._db.cursor(txn=txn) try: next = cur.first() key = None if next: key, data = cur.first() while key is not None: if is_item(key): dic = pickle.loads(data) for image in dic['images']: images[image] = images.get(image, 0) + 1 elif key == "images": imagelistcursor = cur.dup(DB_POSITION) next = cur.next() if next: key, data = next else: break for image, count in images.items(): key = ("imagecount:" + image).encode('utf-8') cur.put(key, str(count), DB_KEYFIRST) imagelistcursor.put("images", pickle.dumps(images.keys()), DB_CURRENT) finally: cur.close() if imagelistcursor != None: imagelistcursor.close() except Exception, ex: txn.abort() raise else: txn.commit() class ModifyItemAction: def __init__(self, item): self._item = item def doit(self, db): db.modify_items([self._item]) class ModifyItemsAction: def __init__(self, items): self._items = items def doit(self, db): db.modify_items(self._items) class ItemsAddedAction: def __init__(self, feed, items): self._feed = feed self._items = items def doit(self, db): db.add_items(self._feed, self._items) class DeleteItemAction: def __init__(self, feed, items): self._feed = feed self._items = items def doit(self, db): db.delete_items(self._feed, self._items) class ImageUpdateAction: def __init__(self, url, image): self._url = url self._image = image def doit(self, db): db.update_image(self._url, self._image) class ImageCountChangedAction: def __init__(self, url, count): self._url = url self._count = count def doit(self, db): db.update_image_count(self._url, self._count) class ItemStore: def __init__(self, dbhome): feedlist = FeedList.get_instance() self._db = MyDB(DATABASE_FILE_NAME, dbhome, create = 1) self.connect_signals() feedlist.signal_connect(Event.FeedCreatedSignal, self._feed_created_cb) feedlist.signal_connect(Event.FeedDeletedSignal, self._feed_deleted_cb) ImageCache.cache.signal_connect(Event.ImageUpdatedSignal, self.image_updated) self._stop = False self._action_queue = [] def _feed_created_cb(self, signal): self._connect_feed_signals(signal.feed) def _feed_deleted_cb(self, signal): self._disconnect_feed_signals(signal.feed) def connect_signals(self): flist = FeedList.get_instance().flatten_list() for f in flist: self._connect_feed_signals(f) def _connect_feed_signals(self, feed): feed.signal_connect(Event.NewItemsSignal, self.items_added) feed.signal_connect(Event.ItemReadSignal, self.item_modified) feed.signal_connect(Event.ItemStickySignal, self.item_modified) feed.signal_connect(Event.AllItemsReadSignal, self.all_items_read) feed.signal_connect(Event.ItemDeletedSignal, self.item_deleted) def _disconnect_feed_signals(self, feed): feed.signal_disconnect(Event.NewItemsSignal, self.items_added) feed.signal_disconnect(Event.ItemReadSignal, self.item_modified) feed.signal_disconnect(Event.ItemStickySignal, self.item_modified) feed.signal_disconnect(Event.AllItemsReadSignal, self.all_items_read) feed.signal_disconnect(Event.ItemDeletedSignal, self.item_deleted) def modify_item(self, item): self._action_queue.append(ModifyItemAction(item)) return def image_updated(self, signal): self._action_queue.append( ImageUpdateAction(signal.url, signal.data)) def read_image(self, url): return self._db.get_image_data(url) def item_deleted(self, signal): self._action_queue.append(DeleteItemAction(signal.sender, signal.item)) def item_modified(self, signal): self.modify_item(signal.item) def all_items_read(self, signal): self._action_queue.append(ModifyItemsAction( [item for index, item in signal.changed])) def items_added(self, signal): self._action_queue.append( ItemsAddedAction(signal.sender, signal.items)) def read_feed_items(self, feed): return self._db.get_feed_items(feed) def get_number_of_unread(self, feed_id, cutoff): return self._db.get_number_of_unread(feed_id, cutoff) def get_image_counts(self): return self._db.get_image_counts() def set_image_count(self, image, count): self._action_queue.append( ImageCountChangedAction(image, count)) def start(self): mlmgr = MainloopManager.get_instance() mlmgr.set_repeating_timer(5000, self._run) def stop(self): mlmgr = MainloopManager.get_instance() mlmgr.end_repeating_timer(self._run) self._db.checkpoint() self._db.close() self._stop = True def _run(self): self._db.checkpoint() freq = 5 timer = freq cpfreq = 60 cptimer = cpfreq prevtime = time.time() if not self._stop: tmptime = time.time() timer += tmptime - prevtime cptimer += tmptime - prevtime prevtime = tmptime if timer > freq: try: while len(self._action_queue): action = self._action_queue.pop(0) if action is None: break action.doit(self._db) except IndexError, e: pass timer = 0 if cptimer > cpfreq: self._db.checkpoint() cptimer = 0 itemstore_instance = None def get_instance(straw_dir=None): global itemstore_instance if itemstore_instance is None: if straw_dir is None: straw_dir = os.path.join(os.getenv('HOME'), ".straw") itemstore_instance = ItemStore(straw_dir) return itemstore_instance def stringify_item(item): itemdict = { 'title': item.title, 'link': item.link, 'description': item.description, 'guid': item.guid, 'guidislink': item.guidislink, 'pub_date': item.pub_date, 'source': item.source, 'images': item.image_keys(), 'seen': item.seen, 'id': item.id, 'fm_license': item.fm_license, 'fm_changes': item.fm_changes, 'creator': item.creator, 'contributors': item.contributors, 'license_urls': item.license_urls, 'publication_name': item.publication_name, 'publication_volume': item.publication_volume, 'publication_number': item.publication_number, 'publication_section': item.publication_section, 'publication_starting_page': item.publication_starting_page, 'sticky': item._sticky, 'enclosures': item.enclosures} return pickle.dumps(itemdict) def unstringify_item(itemstring): if not itemstring: return None idict = _unpickle(itemstring) if not idict: return None item = SummaryItem.SummaryItem() item.title = idict['title'] item.link = idict['link'] item.description = idict['description'] item.guid = idict['guid'] item.pub_date = idict['pub_date'] item.source = idict['source'] for i in idict['images']: item.restore_image(i) item.seen = idict['seen'] item.id = idict['id'] item.guidislink = idict.get('guidislink', True) item.fm_license = idict.get('fm_license', None) item.fm_changes = idict.get('fm_changes', None) item.creator = idict.get('creator', None) item.contributors = idict.get('contributors', None) item.license_urls = idict.get('license_urls', None) item._sticky = idict.get('sticky', 0) item.enclosures = idict.get('enclosures', None) item.publication_name = idict.get('publication_name', None) item.publication_volume = idict.get('publication_volume', None) item.publication_number = idict.get('publication_number', None) item.publication_section = idict.get('publication_section', None) item.publication_starting_page = idict.get('publication_starting_page', None) return item def _unpickle(istring): itemdict = None try: itemdict = pickle.loads(istring) except ValueError, ve: log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring)) except Exception, ex: logtb(str(ex)) return itemdict if __name__ == '__main__': from pprint import pprint db = MyDB("itemstore.db", "%s/.straw" % os.getenv('HOME'), create = 1) db._db_print(db._data_print)