import os import tempfile import unittest import socket import shutil from copy import deepcopy from sets import Set import Swapper.CacheDB.cachedb as cachedb class TestBasicDB(unittest.TestCase): def setUp(self): self.dirname = tempfile.mkdtemp() self.d = cachedb.BasicDB(self.dirname) def tearDown(self): self.d.close() try: shutil.rmtree(self.dirname) except Exception, msg: print "tearDown problem:",Exception, msg, self.dirname, "not removed" def test_basic(self): # test _put, _get, _has_key, _size self.d._put('a', 123) assert self.d._has_key('a') assert not self.d._has_key('b') assert self.d._get('a') == 123 self.d._put('c', {'1':1,'2':2}) assert self.d._size() == 2 # test _delete self.d._put('b', 222) assert self.d._has_key('b') self.d._delete('b') assert not self.d._has_key('b') # test _updateItem self.d._updateItem('a', 123) # insert assert self.d._get('a') == 123, self.d._items() self.d._updateItem('a', 321) assert self.d._get('a') == 321, self.d._get('a') self.d._updateItem('c', {'2':22, '3':3}) assert self.d._get('c') == {'1':1, '2':22, '3':3}, self.d._get('c') # test setDefaultItem x = self.d.default_item y = self.d.setDefaultItem({}) assert y == x, x z = {'a':123, 'd':3} y = self.d.setDefaultItem(z) x.update(z) assert y == x, y def test_sync(self): # write data from mem to disk self.d._put('k', 10) #self.d.close() # del already closes db del self.d assert not hasattr(self, 'd') self.d = cachedb.BasicDB(self.dirname) x = self.d._get('k') assert x == 10, x class TestMyDB(unittest.TestCase): def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'testdb') self.name = socket.gethostname() myinfo = {'permid':'my_permid', 'ip':'1.2.3.4', 'name':self.name} self.d = cachedb.MyDB.getInstance(myinfo=myinfo, db_dir=self.dirname) self.d.initData(myinfo) def tearDown(self): self.d._clear() def test_initData(self): init = { 'version':cachedb.curr_version, 'permid':'my_permid', 'ip':'1.2.3.4', 'port':0, 'name':self.name, 'torrent_path':'', 'prefxchg_queue':[], 'bootstrapping':1, 'max_num_torrents':100000, 'max_num_my_preferences':1000, 'superpeers':Set(), 'friends':Set(), } assert self.d._size() == len(init), self.d._data y = {'ip':'1.2.3.4', 'permid':'permid 1'} # only ip, permid, torrent_path can be updated init.update(y) init.update({'name':'1n;2_qfq2v@$'}) self.d.initData(init) assert self.d._get('ip') == '1.2.3.4', self.d._get('ip') assert self.d._get('permid') == 'permid 1', self.d._get('permid') assert self.d._get('name') != '1n;2_qfq2v@$', self.d._get('name') def test_superpeers(self): # test addSuperPeer, deleteSuperPeer, isSuperPeer, getSuperPeers self.d.addSuperPeer('sp1') self.d.addSuperPeer('sp2') self.d.addSuperPeer('sp1') assert self.d.isSuperPeer('sp1'), self.d._data['superpeers'] assert self.d.isSuperPeer('sp2'), self.d._data['superpeers'] sp = self.d.getSuperPeers() assert isinstance(sp, list) assert Set(sp) == Set(['sp1', 'sp2']) self.d.deleteSuperPeer('sp2') assert not self.d.isSuperPeer('sp2') def test_friends(self): # test addFriend, isFriend, getFriends, deleteFriend self.d.addFriend('friend1') self.d.addFriend('friend2') self.d.addFriend('friend1') assert self.d.isFriend('friend1'), self.d._data['friends'] assert self.d.isFriend('friend2'), self.d._data['friends'] friends = self.d.getFriends() assert isinstance(friends, list) assert Set(friends) == Set(['friend1', 'friend2']) self.d.deleteFriend('friend2') assert not self.d.isFriend('friend2') class TestPeerDB(unittest.TestCase): def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'testdb') self.d = cachedb.PeerDB.getInstance(db_dir=self.dirname) def tearDown(self): self.d._clear() def test_peerdb(self): # test updateItem, getItem, deleteItem self.d.updateItem('peer1',update_time=False) assert self.d.getItem('peer1'), self.d.getItem('peer1') assert self.d.getItem('peer1') == self.d.default_item, self.d.getItem('peer1') x = {'ip':'1.2.3.4', 'port':3} y = deepcopy(self.d.default_item) y.update(x) self.d.updateItem('peer1', x, update_time=False) assert self.d.getItem('peer1') == y, self.d.getItem('peer1') x2 = {'port':7} y = deepcopy(self.d.default_item) y.update(x) y.update(x2) self.d.updateItem('peer1', x2, update_time=False) assert self.d.getItem('peer1') == y, self.d.getItem('peer1') self.d.deleteItem('peer1') assert not self.d.getItem('peer1') class TestTorrentDB(unittest.TestCase): def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'testdb') self.d = cachedb.TorrentDB.getInstance(db_dir=self.dirname) self.d._clear() def tearDown(self): self.d._clear() def test_torrentdb(self): # test updateItem, getItem, deleteItem self.d.updateItem('torrent1') assert self.d.getItem('torrent1'), self.d.getItem('torrent1') assert self.d.getItem('torrent1') == self.d.default_item, self.d.getItem('torrent1') x = {'name':'torrent 1', 'relevance':32} y = deepcopy(self.d.default_item) y.update(x) self.d.updateItem('torrent1', x) assert self.d.getItem('torrent1') == y, self.d.getItem('torrent1') x2 = {'relevance':56} y = deepcopy(self.d.default_item) y.update(x) y.update(x2) self.d.updateItem('torrent1', x2) assert self.d.getItem('torrent1') == y, self.d.getItem('torrent1') self.d.deleteItem('torrent1') assert not self.d.getItem('torrent1') class TestPreferenceDB(unittest.TestCase): def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'testdb') self.d = cachedb.PreferenceDB.getInstance(db_dir=self.dirname) self.d._clear() def tearDown(self): self.d._clear() def test_prefdb(self): # test addPreference, getPreference, hasPreference, getItem self.d.addPreference('peer1', 'torrent1') self.d.addPreference('peer1', 'torrent2') assert self.d.hasPreference('peer1', 'torrent2') it1 = self.d.getItem('peer1') assert isinstance(it1, dict) and len(it1) == 2, it1 self.d.addPreference('peer2', 'torrent3') pf2 = self.d.getPreference('peer2', 'torrent1') assert pf2 is None, pf2 self.d.addPreference('peer2', 'torrent5') pf2 = self.d.getPreference('peer2', 'torrent5') assert pf2 == self.d.default_item, pf2 pf2 = self.d.getPreference('peer2', 'torrent3') assert pf2 == self.d.default_item, pf2 z = {'rank':3, 'relevance':5} self.d.addPreference('peer2', 'torrent3', z) x = deepcopy(self.d.default_item) x.update(z) pf2 = self.d.getPreference('peer2', 'torrent3') assert pf2 == x, pf2 z2 = {'relevance':7} self.d.addPreference('peer2', 'torrent3', z2) x = deepcopy(self.d.default_item) x.update(z) x.update(z2) pf2 = self.d.getPreference('peer2', 'torrent3') assert pf2 == x, pf2 # test deletePreference, deleteItem self.d.deletePreference('peer1', 'torrent2') assert not self.d.hasPreference('peer1', 'torrent2') self.d.deleteItem('peer2') assert not self.d.getItem('peer2') class TestMyPreferenceDB(unittest.TestCase): def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'testdb') self.d = cachedb.MyPreferenceDB.getInstance(db_dir=self.dirname) self.d._clear() def tearDown(self): self.d._clear() def test_myprefdb(self): # test updateItem, getItem, deleteItem self.d.updateItem('torrent1') assert self.d.getItem('torrent1'), self.d.getItem('torrent1') assert self.d.getItem('torrent1') == self.d.default_item, self.d.getItem('torrent1') x = {'name':'torrent 1', 'rank':3} y = deepcopy(self.d.default_item) y.update(x) self.d.updateItem('torrent1', x) item = self.d.getItem('torrent1') assert item['name'] == y['name'] and item['rank'] == y['rank'], self.d.getItem('torrent1') x2 = {'rank':'5'} y = deepcopy(self.d.default_item) y.update(x) y.update(x2) self.d.updateItem('torrent1', x2) item = self.d.getItem('torrent1') assert item['name'] == y['name'] and item['rank'] == y['rank'], self.d.getItem('torrent1') self.d.deleteItem('torrent1') assert not self.d.getItem('torrent1') class TestOwnerDB(unittest.TestCase): def setUp(self): self.dirname = os.path.join(tempfile.gettempdir(), 'testdb') self.d = cachedb.OwnerDB.getInstance(db_dir=self.dirname) def tearDown(self): self.d._clear() def test_owner(self): # test addOwner, getOwner, isOwner, getItem self.d.addOwner('torrent1', 'peer1') self.d.addOwner('torrent1', 'peer2') assert self.d.isOwner('peer2', 'torrent1') it1 = self.d.getItem('torrent1') assert isinstance(it1, list) assert len(it1) == 2, len(it1) self.d.addOwner('torrent2', 'peer3') self.d.addOwner('torrent2', 'peer3') it2 = self.d.getItem('torrent2') assert len(it2) == 1, len(it2) # test deleteOwner, deleteItem self.d.deleteOwner('torrent2', 'peer3') assert not self.d.isOwner('peer3', 'torrent2') self.d.deleteItem('torrent1') assert not self.d.getItem('torrent1') def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestBasicDB)) suite.addTest(unittest.makeSuite(TestMyDB)) suite.addTest(unittest.makeSuite(TestPeerDB)) suite.addTest(unittest.makeSuite(TestTorrentDB)) suite.addTest(unittest.makeSuite(TestPreferenceDB)) suite.addTest(unittest.makeSuite(TestMyPreferenceDB)) suite.addTest(unittest.makeSuite(TestOwnerDB)) return suite