import sys from Swapper.CacheDB.CacheDBHandler import TorrentDBHandler, PeerDBHandler from Swapper.utilities import sortList DEBUG = False class TorrentFetcher: def __init__(self, size=10, db_dir=''): self.size = size self.torrent_db = TorrentDBHandler(db_dir=db_dir) self.peer_db = PeerDBHandler(db_dir=db_dir) self.todo_cache = {} self.done_cache = {} def _reload(self): # reload some torrents into todo cache def incache(infohash): return (infohash not in self.todo_cache) and (infohash not in self.done_cache) empty_torrents = self.torrent_db.getNoMetaTorrents() #TODO: performance improve - check if db has changed empty_torrents = filter(incache, empty_torrents) # remove cached torrents relevance = self.torrent_db.getTorrentsValue(empty_torrents, ['relevance']) len_toadd = self.size - len(self.todo_cache) if len_toadd < 0: len_toadd = 0 recom_list = sortList(empty_torrents, relevance)[:len_toadd] for t in recom_list: owners = self.torrent_db.getOwners(t) if owners: self.todo_cache[t] = owners def getTask(self, num_owners=6): # select a torrent infohash to collect its metadata if len(self.todo_cache) == 0: self._reload() if len(self.todo_cache) == 0: # no torrent return None torrent = self.todo_cache.keys()[0] all_owners = self.todo_cache.pop(torrent) ages = self.peer_db.getPeersValue(all_owners, ['last_seen']) owners = sortList(all_owners, ages)[:num_owners] self.done_cache[torrent] = None return (torrent, owners) def hasMetaData(self, infohash): return self.torrent_db.hasMetaData(infohash) class JobQueue: def __init__(self, maxsize, num_owners, db_dir=''): self.maxsize = maxsize self.num_owners = num_owners self._queue = [None]*maxsize self.pointer = 0 # check pointer self.fetcher = TorrentFetcher(self.maxsize, db_dir) def load(self): while len(self._queue) < self.maxsize: task = self.fetcher.getTask(self.num_owners) if not task: break self._queue.append(task) def getJob(self): # load a new task if there is a vacancy or all owners have been used or the job has been done if DEBUG: print ">>>", self.pointer if not self._queue[self.pointer] or not self._queue[self.pointer][1] or \ self.fetcher.hasMetaData(self._queue[self.pointer][0]): task = self.fetcher.getTask(self.num_owners) self._queue[self.pointer] = task if DEBUG: if task is None: print "** new task", task else: print "** new task", task[0], task[1][:3] if self._queue[self.pointer]: print "** ", self.fetcher.hasMetaData(self._queue[self.pointer][0]), self._queue[self.pointer][1][:3] task = self._queue[self.pointer] self.pointer += 1 if self.pointer >= self.maxsize: self.pointer = 0 if task: infohash, owners = task if owners and len(owners) > 0: permid = owners.pop(0) return infohash, permid # TODO: implement click and download class TorrentCollecting: __single = None def __init__(self, db_dir=''): if TorrentCollecting.__single: raise RuntimeError, "TorrentCollecting is singleton" TorrentCollecting.__single = self self.registered = False self.collect_interval = 11 # use prime to avoid conflict self.queue_length = 31 self.num_owners = 6 # max number of owners of a torrent self.job_queue = JobQueue(self.queue_length, self.num_owners, db_dir) def getInstance(*args, **kw): if TorrentCollecting.__single is None: TorrentCollecting(*args, **kw) return TorrentCollecting.__single getInstance = staticmethod(getInstance) def register(self, secure_overlay, rawserver, metadata_handler): if not self.registered: self.secure_overlay = secure_overlay self.rawserver = rawserver self.metadata_handler = metadata_handler self.registered = True self.startup() def startup(self): #self.job_queue.load() if self.registered: if DEBUG: print >> sys.stderr, "collect: Torrent collecting starts up" self.rawserver.add_task(self.collect, self.collect_interval) def collect(self): self.rawserver.add_task(self.collect, self.collect_interval) job = self.job_queue.getJob() if job: infohash, permid = job self.metadata_handler.send_metadata_request(permid, infohash) if DEBUG: print "got job: ", permid, infohash for x in self.job_queue._queue: if x is None: print x else: id, q = x print id, len(q), q[:3] print '-------------' def test(self): #self.rawserver.add_task(self.test, self.test_interval) pass