__revision__ = '$Id: __init__.py,v 1.13 2003/11/20 01:14:18 jkloth Exp $' import os, tempfile, time, sha, socket from Ft.Lib.TestSuite import TestMode from Ft.Server.Server import Launcher from Ft.Server.Server.Lib import LogUtil, ConfigFile from Ft.Server.FtRpc import FtServerFtRpcException from Ft.Server.Client import Error, FtServerClientException def PreprocessFiles(dirs, files): """ This function is responsible for sorting and trimming the file and directory lists as needed for proper testing. """ from Ft.Lib.TestSuite import RemoveTests, SortTests ignored_files = [] RemoveTests(files, ignored_files) ordered_files = [] SortTests(files, ordered_files) ignored_dirs = [] RemoveTests(dirs, ignored_dirs) ordered_dirs = ['Core','Commands'] SortTests(dirs, ordered_dirs) return (dirs, files) # -- run modes ------------------------------------------------------- class TestLauncher(Launcher.Launcher): def info(*args): pass _sha_hash = lambda str: sha.new(str).hexdigest() # The Dynamic and/or Private Ports are those from 49152 through 65535 FTRPC_HOST = 'localhost' FTRPC_PORT = 58803 class _ServerMode(TestMode.TestMode): databaseInited = 0 def __init__(self): self.dbName = 'test_db' TestMode.TestMode.__init__(self, "Core Server", 1) def _init(self, tester): if not tester.test_data.has_key('userName'): tester.test_data['userName'] = 'root' tester.test_data['password'] = 'root' tester.test_data['ftrpc-host'] = FTRPC_HOST tester.test_data['ftrpc-port'] = FTRPC_PORT success = 1 if not self.databaseInited: self.cfgFileName = tempfile.mktemp() f = open(self.cfgFileName, 'w') f.write(CFG_FILE) f.close() success = self.createRepo(tester) _ServerMode.databaseInited = 1 return success def _pre(self, tester): tester.startTest("Start Server") launcher = TestLauncher(tester.test_data['userName'], _sha_hash(tester.test_data['password']), self._properties) launcher.start() time.sleep(2) tester.testDone() return def _post(self, tester): tester.startTest("Stop Server") launcher = TestLauncher(tester.test_data['userName'], _sha_hash(tester.test_data['password']), self._properties) launcher.stop() tester.testDone() return def createRepo(self,tester): from Ft.Server.Server import SCore from Ft.Server.Server.Commands import Init from Ft.Server.Common.Install import InstallUtil # Needed because of class attribute modifications in these files. # Without this, strange exceptions occur when running the complete # Server test suite. reload(InstallUtil) reload(Init) self._properties = ConfigFile.Read(self.cfgFileName)['TestCore'] self._properties['ConfigFile'] = self.cfgFileName self._properties['CoreId'] = 'TestCore' tester.test_data['properties'] = self._properties tester.startTest("Initialize repository") # See if there is already a server present s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect((FTRPC_HOST, FTRPC_PORT)) except socket.error: # No servers listening on the host:port we want pass else: tester.error("Unable to test the client code as there is a server " "listening on %s:%d" % (FTRPC_HOST, FTRPC_PORT)) tester.testDone() return 0 username = tester.test_data['userName'] password = tester.test_data['password'] Init.DoInit(self._properties, username, _sha_hash(password), ['Core'], users=[(username, password)], quiet=1, confirm=0, destroyRepo=1) repo = SCore.GetRepository(username, _sha_hash(password), LogUtil.NullLogger(), self._properties) # Add the FtRpc Server if repo.hasResource('/ftss/servers'): repo.deleteResource('/ftss/servers') serverRoot = repo.createContainer('/ftss/servers') serverRoot.createDocument('FtRpc.server', FTRPC_SERVER) # Add a clean docdefs container if repo.hasResource('/ftss/docdefs'): repo.deleteResource('/ftss/docdefs') repo.createContainer('/ftss/docdefs') # Add a clean commands container if repo.hasResource('/ftss/commands'): repo.deleteResource('/ftss/commands') repo.createContainer('/ftss/commands') repo.txCommit() tester.testDone() # Success! return 1 MODES = [_ServerMode(), ] CFG_FILE=""" testserver %s %s %s debug """ % (FTRPC_HOST, tempfile.mktemp(), os.path.join(tempfile.tempdir, 'log')) FTRPC_SERVER = """ This is a test server FtRpc ftrpc %d notice FtRpc """ % FTRPC_PORT