__revision__ = '$Id: __init__.py,v 1.13 2003/11/20 01:14:18 jkloth Exp $'
import os, tempfile, time, sha, socket
from Ft.Lib.TestSuite import TestMode
from Ft.Server.Server import Launcher
from Ft.Server.Server.Lib import LogUtil, ConfigFile
from Ft.Server.FtRpc import FtServerFtRpcException
from Ft.Server.Client import Error, FtServerClientException
def PreprocessFiles(dirs, files):
"""
This function is responsible for sorting and trimming the
file and directory lists as needed for proper testing.
"""
from Ft.Lib.TestSuite import RemoveTests, SortTests
ignored_files = []
RemoveTests(files, ignored_files)
ordered_files = []
SortTests(files, ordered_files)
ignored_dirs = []
RemoveTests(dirs, ignored_dirs)
ordered_dirs = ['Core','Commands']
SortTests(dirs, ordered_dirs)
return (dirs, files)
# -- run modes -------------------------------------------------------
class TestLauncher(Launcher.Launcher):
def info(*args):
pass
_sha_hash = lambda str: sha.new(str).hexdigest()
# The Dynamic and/or Private Ports are those from 49152 through 65535
FTRPC_HOST = 'localhost'
FTRPC_PORT = 58803
class _ServerMode(TestMode.TestMode):
databaseInited = 0
def __init__(self):
self.dbName = 'test_db'
TestMode.TestMode.__init__(self, "Core Server", 1)
def _init(self, tester):
if not tester.test_data.has_key('userName'):
tester.test_data['userName'] = 'root'
tester.test_data['password'] = 'root'
tester.test_data['ftrpc-host'] = FTRPC_HOST
tester.test_data['ftrpc-port'] = FTRPC_PORT
success = 1
if not self.databaseInited:
self.cfgFileName = tempfile.mktemp()
f = open(self.cfgFileName, 'w')
f.write(CFG_FILE)
f.close()
success = self.createRepo(tester)
_ServerMode.databaseInited = 1
return success
def _pre(self, tester):
tester.startTest("Start Server")
launcher = TestLauncher(tester.test_data['userName'],
_sha_hash(tester.test_data['password']),
self._properties)
launcher.start()
time.sleep(2)
tester.testDone()
return
def _post(self, tester):
tester.startTest("Stop Server")
launcher = TestLauncher(tester.test_data['userName'],
_sha_hash(tester.test_data['password']),
self._properties)
launcher.stop()
tester.testDone()
return
def createRepo(self,tester):
from Ft.Server.Server import SCore
from Ft.Server.Server.Commands import Init
from Ft.Server.Common.Install import InstallUtil
# Needed because of class attribute modifications in these files.
# Without this, strange exceptions occur when running the complete
# Server test suite.
reload(InstallUtil)
reload(Init)
self._properties = ConfigFile.Read(self.cfgFileName)['TestCore']
self._properties['ConfigFile'] = self.cfgFileName
self._properties['CoreId'] = 'TestCore'
tester.test_data['properties'] = self._properties
tester.startTest("Initialize repository")
# See if there is already a server present
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((FTRPC_HOST, FTRPC_PORT))
except socket.error:
# No servers listening on the host:port we want
pass
else:
tester.error("Unable to test the client code as there is a server "
"listening on %s:%d" % (FTRPC_HOST, FTRPC_PORT))
tester.testDone()
return 0
username = tester.test_data['userName']
password = tester.test_data['password']
Init.DoInit(self._properties, username, _sha_hash(password),
['Core'], users=[(username, password)],
quiet=1, confirm=0, destroyRepo=1)
repo = SCore.GetRepository(username, _sha_hash(password),
LogUtil.NullLogger(), self._properties)
# Add the FtRpc Server
if repo.hasResource('/ftss/servers'):
repo.deleteResource('/ftss/servers')
serverRoot = repo.createContainer('/ftss/servers')
serverRoot.createDocument('FtRpc.server', FTRPC_SERVER)
# Add a clean docdefs container
if repo.hasResource('/ftss/docdefs'):
repo.deleteResource('/ftss/docdefs')
repo.createContainer('/ftss/docdefs')
# Add a clean commands container
if repo.hasResource('/ftss/commands'):
repo.deleteResource('/ftss/commands')
repo.createContainer('/ftss/commands')
repo.txCommit()
tester.testDone()
# Success!
return 1
MODES = [_ServerMode(),
]
CFG_FILE="""
testserver
%s
%s
%s
debug
""" % (FTRPC_HOST,
tempfile.mktemp(),
os.path.join(tempfile.tempdir, 'log'))
FTRPC_SERVER = """
This is a test server
FtRpc
ftrpc
%d
notice
FtRpc
""" % FTRPC_PORT