# Written by Jie Yang # see LICENSE.txt for license information """ Database design Value in bracket is the default value Don't use None as a default value MyDB - (PeerDB) mydata.bsd: # future keys: pictures, version: int (curr_version) # required permid: str # required ip: str ('') port: int (0) name: str ('Swapper') torrent_path: str ('') # default path to store torrents prefxchg_queue: list ([]) # permid bootstrapping: int (1) max_num_torrents: int (100000) max_num_my_preferences: int (1000) superpeers: Set([permid]) friends: Set([permid]) PeerDB - (MyFriendDB, PreferenceDB, OwnerDB) peers.bsd: # future keys: sys_trust, reliablity, speed, personal_info, .. permid:{ ip: str ('') port: int (0) # listening port, even behind firewall name: str ('unknown') last_seen: int (0) similarity: int (0) # [0, 1000] connected_times: int(0) # times to connect the peer successfully tried_times: int(0) # times to attempt to connect the peer buddycast_times: int(0) # times to receive buddycast message #relability (uptime, IP fixed/changing) #trust: int (0) # [0, 100] #icon: str ('') # name + '_' + permid[-4:] } TorrentDB - (PreferenceDB, MyPreference, OwnerDB) torrents.bsd: # future keys: names, tags, trackers, .. infohash:{ relevance: int (0) # [0, 1000] torrent_name: str ('') # torrent name torrent_dir: str ('') # path of the torrent (without the file name). '\x01' for default path info: dict ({}) # {name, length, announce, creation date, comment, announce-list, num_files} leecher: int (0) seeder: int (0) } PreferenceDB - (PeerDB, TorrentDB) # other peers' preferences preferences.bsd: permid:{ torrent_id:{'relevance': int (0), 'rank': int (0)} # re: [0, 1000], rank: [-1, 5] } MyPreferenceDB - (TorrentDB) mypreferences.bsd: # future keys: speed infohash:{ created_time: int (0) # time to start download/upload the torrent content_name: str ('') # real file name in disk, may be different with info['name'] content_dir: str ('') # content_dir + content_name = full path rank: int (0) # [-1, 5], # -1 means it is a fake torrent last_seen: int (0) } OwnerDB - (PeerDB, TorrentDB) owner.bsd: infohash: Set([permid]) # future keys: tags, name """ import os, sys from time import time, ctime from random import random from sha import sha from copy import deepcopy from sets import Set from traceback import print_exc, print_stack from threading import currentThread #from Swapper.utilities import isValidPermid, isValidInfohash try: # For Python 2.3 from bsddb import db, dbshelve, dbutils except ImportError: # For earlier Pythons w/distutils pybsddb from bsddb3 import db, dbshelve, dbutils from shelve import BsdDbShelf #permid_len = 0 #112 #infohash_len = 20 # home_dir = 'bsddb' curr_version = 1 permid_length = 112 infohash_length = 20 torrent_id_length = 20 MAX_RETRIES = 12 STRICT_CHECK = False DEBUG = False def isValidPermid(permid): # validate permid in outer layer return True def isValidInfohash(infohash): return True def init(config_dir, myinfo): """ create all databases """ global home_dir home_dir = make_filename(config_dir, 'bsddb') MyDB.getInstance(myinfo, home_dir) PeerDB.getInstance(home_dir) TorrentDB.getInstance(home_dir) PreferenceDB.getInstance(home_dir) MyPreferenceDB.getInstance(home_dir) OwnerDB.getInstance(home_dir) def done(config_dir): MyDB.getInstance().close() MyPreferenceDB.getInstance().close() OwnerDB.getInstance().close() PeerDB.getInstance().close() PreferenceDB.getInstance().close() TorrentDB.getInstance().close() def make_filename(config_dir,filename): if config_dir is None: return filename else: return os.path.join(config_dir,filename) def setDBPath(db_dir = ''): if not db_dir: db_dir = '.' if not os.access(db_dir, os.F_OK): try: os.mkdir(db_dir) except os.error, msg: print >> sys.stderr, "cachedb: cannot set db path:", msg db_dir = '.' return db_dir def open_db2(filename, db_dir='', filetype=db.DB_BTREE): # backup global home_dir if not db_dir: db_dir = home_dir dir = setDBPath(db_dir) path = os.path.join(dir, filename) try: d = dbshelve.open(path, filetype=filetype) except Exception, msg: print >> sys.stderr, "cachedb: cannot open dbshelve on", path, msg d = dbshelve.open(filename, filetype=filetype) return d def open_db(filename, db_dir='', filetype=db.DB_BTREE, writeback=False): global home_dir if not db_dir: db_dir = home_dir dir = setDBPath(db_dir) path = os.path.join(dir, filename) env = db.DBEnv() # Concurrent Data Store env.open(dir, db.DB_THREAD|db.DB_INIT_CDB|db.DB_INIT_MPOOL|db.DB_CREATE|db.DB_PRIVATE) #d = db.DB(env) #d.open(path, filetype, db.DB_THREAD|db.DB_CREATE) #_db = BsdDbShelf(d, writeback=writeback) _db = dbshelve.open(filename, flags=db.DB_THREAD|db.DB_CREATE, filetype=filetype, dbenv=env) return _db def validDict(data, keylen=0): # basic requirement for a data item in DB if not isinstance(data, dict): return False for key in data: if not isinstance(key, str): return False if STRICT_CHECK and keylen and len(key) != keylen: return False return True def validList(data, keylen=0): if not isinstance(data, list): return False for key in data: if not isinstance(key, str): return False if STRICT_CHECK and keylen and len(key) != keylen: return False return True # Abstract base calss class BasicDB: # Should we use delegation instead of inheritance? def __init__(self, db_dir=''): self.default_item = {'d':1, 'e':'abc', 'f':{'k':'v'}, 'g':[1,'2']} # for test if self.__class__ == BasicDB: self.db_name = 'basic.bsd' # for testing self.opened = True self._data = open_db(self.db_name, db_dir, filetype=db.DB_HASH) #raise NotImplementedError, "Cannot create object of class BasicDB" #------------ Basic interfaces, used by member func and handlers -------------# def __del__(self): self.close() threadnames = {} def _put(self, key, value): # write try: if DEBUG: name = currentThread().getName() if name not in self.threadnames: self.threadnames[name] = 0 self.threadnames[name] += 1 print >> sys.stderr, "cachedb: put", len(self.threadnames), name, \ self.threadnames[name], time(), self.__class__.__name__ dbutils.DeadlockWrap(self._data.put, key, value, max_retries=MAX_RETRIES) #self._data.put(key, value) except: pass def _has_key(self, key): # find a key try: return dbutils.DeadlockWrap(self._data.has_key, key, max_retries=MAX_RETRIES) #return self._data.has_key(key) except: return False def _get(self, key, value=None): # read try: return dbutils.DeadlockWrap(self._data.get, key, value, max_retries=MAX_RETRIES) #return self._data.get(key, value) except: print >> sys.stderr, "cachedb: _get EXCEPTION BY",currentThread().getName() print_exc() return None def _updateItem(self, key, data): try: x = self._get(key) if isinstance(x, dict): x.update(data) else: x = data self._put(key, x) except: print_exc() def _delete(self, key): try: if DEBUG: name = currentThread().getName() if name not in self.threadnames: self.threadnames[name] = 0 self.threadnames[name] += 1 print >> sys.stderr, "cachedb: del", len(self.threadnames), name, \ self.threadnames[name], time(), self.__class__.__name__ dbutils.DeadlockWrap(self._data.delete, key, max_retries=MAX_RETRIES) #self._data.delete(key) except: pass def _sync(self): # write data from mem to disk dbutils.DeadlockWrap(self._data.sync, max_retries=MAX_RETRIES) #self._data.sync() def _clear(self): dbutils.DeadlockWrap(self._data.clear, max_retries=MAX_RETRIES) #self._data.clear() def _keys(self): return dbutils.DeadlockWrap(self._data.keys, max_retries=MAX_RETRIES) #return self._data.keys() def _values(self): return dbutils.DeadlockWrap(self._data.values, max_retries=MAX_RETRIES) #return self._data.values() def _items(self): return dbutils.DeadlockWrap(self._data.items, max_retries=MAX_RETRIES) #return self._data.items() def _size(self): try: return dbutils.DeadlockWrap(len, self._data, max_retries=MAX_RETRIES) #return len(self._data) except: print_exc() print >> sys.stderr, "cachedb: cachedb.BasicDB._size error", self.__class__.__name__ return 0 def close(self): if DEBUG: print >> sys.stderr, "cachedb: Closing database",self.db_name,currentThread().getName() if self.opened: try: self._sync() dbutils.DeadlockWrap(self._data.close, max_retries=MAX_RETRIES) if DEBUG: print >> sys.stderr, "cachedb: Done waiting for database close",self.db_name,currentThread().getName() #self._data.close() except: print_exc() self.opened = False def updateDB(self, old_version): raise NotImplementedError def setDefaultItem(self, item): df = deepcopy(self.default_item) df.update(item) return df class MyDB(BasicDB): __single = None def __init__(self, myinfo=None, db_dir=''): if MyDB.__single: raise RuntimeError, "MyDB is singleton" self.db_name = 'mydata.bsd' self.opened = True self._data = open_db(self.db_name, db_dir, filetype=db.DB_HASH) # dbshelve object MyDB.__single = self self.default_data = { 'version':curr_version, 'permid':'', 'ip':'', 'port':0, 'name':'Swapper', 'torrent_path':'', 'prefxchg_queue':[], 'bootstrapping':1, 'max_num_torrents':100000, 'max_num_my_preferences':1000, 'superpeers':Set(), 'friends':Set(), } self.preload_keys = ['ip', 'torrent_path', 'permid'] # these keys can be changed at each bootstrap self.initData(myinfo) def getInstance(*args, **kw): if MyDB.__single is None: MyDB(*args, **kw) if MyDB.__single._size() < len(MyDB.__single.default_data): MyDB.__single.initData() return MyDB.__single getInstance = staticmethod(getInstance) def setDefault(self, data): # it is only used by validData() dd = deepcopy(self.default_data) dd.update(data) return dd def initData(self, myinfo=None): MyDB.checkVersion(self) if not myinfo: myinfo = {} myinfo = self.setDefault(myinfo) self.load(myinfo) def load(self, myinfo): for key in myinfo: if not self._has_key(key) or key in self.preload_keys: # right? self._put(key, myinfo[key]) def checkVersion(db): if not MyDB.__single: MyDB() # it should never be entered old_version = MyDB.__single._get('version') if not old_version: MyDB.__single._put('version', curr_version) elif old_version < curr_version: db.updateDB(old_version) elif old_version > curr_version: raise RuntimeError, "The version of database is too high. Please update the software." checkVersion = staticmethod(checkVersion) # superpeers def addSuperPeer(self, permid): if isValidPermid(permid): sp = self._get('superpeers') sp.add(permid) self._put('superpeers', sp) def deleteSuperPeer(self, permid): if isValidPermid(permid): try: sp = self._get('superpeers') sp.remove(permid) self._put('superpeers', sp) except: pass def isSuperPeer(self, permid): return permid in self._get('superpeers') def getSuperPeers(self): superpeers = self._get('superpeers') if superpeers is not None: return list(superpeers) else: return [] # friends def addFriend(self, permid): if isValidPermid(permid): if not 'friends' in self._keys(): print >> sys.stderr, "cachedb: addFriend key error", self._keys() fr = self._get('friends') fr.add(permid) self._put('friends', fr) def deleteFriend(self, permid): try: fr = self._get('friends') fr.remove(permid) self._put('friends', fr) except: pass def isFriend(self, permid): return permid in self._get('friends') def getFriends(self): friends = self._get('friends') if friends is not None: return list(friends) else: return [] class PeerDB(BasicDB): """ List of Peers, e.g. Host Cache """ __single = None def __init__(self, db_dir=''): if PeerDB.__single: raise RuntimeError, "PeerDB is singleton" self.db_name = 'peers.bsd' self.opened = True self._data = open_db(self.db_name, db_dir) # dbshelve object MyDB.checkVersion(self) PeerDB.__single = self self.default_item = { 'ip':'', 'port':0, 'name':'', 'last_seen':0, 'similarity':0, 'connected_times':0, 'tried_times':0, 'buddycast_times':0, #'trust':50, #'reliability': #'icon':'', } self.new_encountered_peer = True def getInstance(*args, **kw): if PeerDB.__single is None: PeerDB(*args, **kw) return PeerDB.__single getInstance = staticmethod(getInstance) def updateItem(self, permid, item={}, update_dns=True, update_time=True): # insert a peer; update it if existed # if item.has_key('name'): # assert item['name'] != 'qfqf' if isValidPermid(permid) and validDict(item): if self._has_key(permid): _item = self.getItem(permid) if _item is None: # database error, the key exists, but the data ain't there return if not update_dns: if item.has_key('ip'): item.pop('ip') if item.has_key('port'): item.pop('port') _item.update(item) if update_time: _item.update({'last_seen':int(time())}) self._updateItem(permid, _item) else: item = self.setDefaultItem(item) if update_time: item.update({'last_seen':int(time())}) self._put(permid, item) def deleteItem(self, permid): self._delete(permid) def getItem(self, permid, default=False): ret = self._get(permid, None) if ret is None and default: ret = deepcopy(self.default_item) return ret def hasItem(self, permid): return self._has_key(permid) def hasNewEncounteredPeer(self, v): self.new_encountered_peer = v class TorrentDB(BasicDB): """ Database of all torrent files, including the torrents I don't have yet """ __single = None def __init__(self, db_dir=''): if TorrentDB.__single: raise RuntimeError, "TorrentDB is singleton" self.db_name = 'torrents.bsd' self.opened = True self._data = open_db(self.db_name, db_dir) # dbshelve object MyDB.checkVersion(self) TorrentDB.__single = self self.default_item = { 'relevance':0, 'torrent_name':'', # name of the torrent 'torrent_dir':'', # dir+name=full path. Default path if the value is '\x01' 'info':{}, # {name, length, announce, creation date, comment} } self.new_metadata = True def getInstance(*args, **kw): if TorrentDB.__single is None: TorrentDB(*args, **kw) return TorrentDB.__single getInstance = staticmethod(getInstance) def updateItem(self, infohash, item={}): # insert a torrent; update it if existed if isValidInfohash(infohash) and validDict(item): if self._has_key(infohash): _item = self.getItem(infohash) if not _item: print >> sys.stderr, "cachedb: Error in cachedb.TorrentDB.updateItem: database inconsistant!", self._has_key(infohash), self.getItem(infohash) return _item.update(item) self._updateItem(infohash, _item) else: item = self.setDefaultItem(item) self._put(infohash, item) def deleteItem(self, infohash): self._delete(infohash) def getItem(self, infohash, default=False): ret = self._get(infohash, None) if ret is None and default: ret = deepcopy(self.default_item) return ret def hasNewMetadata(self, v): self.new_metadata = v class PreferenceDB(BasicDB): """ Peer * Torrent """ __single = None def __init__(self, db_dir=''): if PreferenceDB.__single: raise RuntimeError, "PreferenceDB is singleton" self.db_name = 'preferences.bsd' self.opened = True self._data = open_db(self.db_name, db_dir) # dbshelve object MyDB.checkVersion(self) PreferenceDB.__single = self self.default_item = { # subitem actually 'relevance':0, # relevance from the owner of this torrent 'rank':0 } def getInstance(*args, **kw): if PreferenceDB.__single is None: PreferenceDB(*args, **kw) return PreferenceDB.__single getInstance = staticmethod(getInstance) def addPreference(self, permid, infohash, data={}): # add or update pref if not isValidPermid(permid) or not isValidInfohash(infohash): return if not self._has_key(permid): data = self.setDefaultItem(data) item = {infohash:data} else: if self.hasPreference(permid, infohash): _data = self.getPreference(permid, infohash) _data.update(data) else: _data = self.setDefaultItem(data) _item = {infohash:_data} item = self.getItem(permid) item.update(_item) self._put(permid, item) def deletePreference(self, permid, infohash): if self._has_key(permid): preferences = self._get(permid) preferences.pop(infohash) self._put(permid, preferences) def getPreference(self, permid, infohash): if self._has_key(permid): preferences = self._get(permid) if preferences.has_key(infohash): return preferences[infohash] return None def hasPreference(self, permid, infohash): if self._has_key(permid): return infohash in self._get(permid) else: return False def deleteItem(self, permid): self._delete(permid) def getItem(self, permid): return self._get(permid, {}) class MyPreferenceDB(BasicDB): # = FileDB __single = None def __init__(self, db_dir=''): if MyPreferenceDB.__single: raise RuntimeError, "TorrentDB is singleton" self.db_name = 'mypreferences.bsd' self.opened = True self._data = open_db(self.db_name, db_dir) # dbshelve object MyDB.checkVersion(self) MyPreferenceDB.__single = self self.default_item = { 'created_time':0, 'rank':0, # -1 ~ 5, as a recommendation degree to others 'content_name':'', # real file name in disk, may be different with info['name'] 'content_dir':'', # dir + name = full path 'last_seen':0, } def getInstance(*args, **kw): if MyPreferenceDB.__single is None: MyPreferenceDB(*args, **kw) return MyPreferenceDB.__single getInstance = staticmethod(getInstance) def updateItem(self, infohash, item={}): # insert a torrent; update it if existed if isValidInfohash(infohash) and validDict(item): if self._has_key(infohash): _item = self.getItem(infohash) _item.update(item) _item.update({'last_seen':int(time())}) self._updateItem(infohash, _item) else: self.default_item['created_time'] = self.default_item['last_seen'] = int(time()) item = self.setDefaultItem(item) self._put(infohash, item) self._sync() def deleteItem(self, infohash): self._delete(infohash) self._sync() def getItem(self, infohash, default=False): ret = self._get(infohash, None) if ret is None and default: ret = deepcopy(self.default_item) return ret def getRank(self, infohash): v = self._get(infohash) if not v: return 0 return v.get('rank', 0) class OwnerDB(BasicDB): """ Torrent * Peer """ __single = None def __init__(self, db_dir=''): if OwnerDB.__single: raise RuntimeError, "OwnerDB is singleton" self.db_name = 'owners.bsd' self.opened = True self._data = open_db(self.db_name, db_dir) # dbshelve object OwnerDB.__single = self def getInstance(*args, **kw): if OwnerDB.__single is None: OwnerDB(*args, **kw) return OwnerDB.__single getInstance = staticmethod(getInstance) def getNumOwners(self, infohash): owners = self._get(infohash) if owners is not None: n = len(owners) else: n = 0 #print n, `infohash`, owners return n def addOwner(self, infohash, permid): if isValidPermid(permid) and isValidInfohash(infohash): if self._has_key(infohash): owners = self._get(infohash) owners.add(permid) self._put(infohash, owners) else: self._put(infohash, Set([permid])) def deleteOwner(self, infohash, permid): try: owners = self._get(infohash) owners.remove(permid) if not owners: # remove the item if it is empty self._delete(infohash) else: self._put(infohash, owners) except: pass def isOwner(self, permid, infohash): if self._has_key(infohash): owners = self._get(infohash) return permid in owners else: return False def deleteItem(self, infohash): self._delete(infohash) def getItem(self, infohash): owners = self._get(infohash) if owners is not None: return list(owners) else: return []