# // **************************************************************************** # // copyright (c) 2000-2006 Horst Knorr # // This file is part of the hk_classes library. # // This file may be distributed and/or modified under the terms of the # // GNU Library Public License version 2 as published by the Free Software # // Foundation and appearing in the file COPYING included in the # // packaging of this file. # // This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE # // WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. # // **************************************************************************** try: import sys,types,time from traceback import * from os.path import * except: print "Main python modules could not be loaded, wrong installation?" exit(1) try: from hk_classes import * except: print "hk_classes module could not be loaded, wrong installation?" exit(1) dr=hk_drivermanager() if dr==None: print "drivermanager could not be loaded!" exit(1) #general definitions successfull_tests=0 failed_tests=0 testnumber=0 #test_messages=[] skipped_tests=0 skipped_messages=[] tested_drivers=[] performed_tests=[] test_warnings=[] reason="" ok="OK" failed="FAILED" skipped="SKIPPED" defaultdbname="hkclassestest" testtablename="test" # Definition of the test table # columntype,columnname,size,is_primary,is_notnull column_definitions= [ [hk_column.auto_inccolumn,"id",4,True,False], [hk_column.textcolumn,"text",255,False,False], [hk_column.floatingcolumn,"floating",0,False,False], [hk_column.smallintegercolumn,"smallint",0,False,False], [hk_column.datecolumn,"datecol",0,False,False], [hk_column.timecolumn,"timecol",0,False,False], [hk_column.datetimecolumn,"dtcol",0,False,False] ] #test data for creating and updating tables table_columns=["text","smallint","floating","datecol","timecol","dtcol"] table_values= [ ["mytext",0,0.0,"05.03.2005","14:35:00","01.01.1999 00:00:00"], ["mytext2",255,255.75,"07.02.1901","00:01:00","31.01.2005 13:42:04"], ["mytext3",-255,-255.75,"28.11.2006","03:00:15","24.12.2005 15:03:00"] ] update_values= [ ["myreplacedtext",5,9.1,"25.01.1974","12:00:00","11.08.1912 03:04:05"], ["myreplacedt2",-2,-8.75,"01.03.1965","01:00:14","16.01.1913 13:24:35"] ] #special treatment of login infos # drivername:{database,logindatabase,user,password,host} login_info={ "firebird":{"database":"/opt/firebird/examples/hkclassestest.fdb","logindatabase":"/unknown.fdb"}, "mdb":{"database":"./misc/test.mdb"} } #general test unit function definitions def perform_test(driver,tstfunction,functiondata,description): global failed_tests global successfull_tests global skipped_tests global skipped_messages global test_messages global performed_tests global reason global tested_drivers global testnumber testnumber=testnumber+1 if driver not in tested_drivers: tested_drivers.append(driver) result=failed reason="Unknown" try: result=tstfunction(driver,functiondata) except Exception,inst: result=failed reason="Exception occured: "+str(inst) #print_tb(sys.exc_info(),5) msg="["+str(testnumber)+"]\tDriver:'"+driver+"': "+description+"..."+result #print msg performed_tests.append(msg) if result==failed: #print " Reason: ",reason failed_tests=failed_tests+1 #performed_tests.append("\t"+reason) test_warnings.append("["+str(testnumber)+"]\tDriver:'"+driver+"': "+description+": "+reason) elif result==ok: successfull_tests=successfull_tests+1 elif result==skipped: skipped_tests=skipped_tests+1 #print " Reason: ", reason #performed_tests.append("\t"+reason) skipped_messages.append(msg) skipped_messages.append("\t"+reason) return result def print_teststatistic(): global failed_tests global successfull_tests global test_messages global skipped_tests global performed_tests line="================================================" print line print "HK_CLASSES Test Suite" print "Test summary at",time.strftime("%d.%m.%y at %H:%M") print line print "Tested drivers:",tested_drivers print "Total tests:\t",successfull_tests+failed_tests+skipped_tests print "Failed: \t",failed_tests print "Skipped:\t",skipped_tests print line print "Performed tests:" print line for tst in performed_tests: print tst if len(test_warnings)>0: print print "Failed tests:" print line for tst in test_warnings: print tst print if len(skipped_messages)>0: print print "Skipped tests:" print line for tst in skipped_messages: print tst print #FUNCTIONALITY FUNCTIONS def testdbname(driver): result=defaultdbname if driver in login_info: login=login_info[driver] if "database" in login: result=login["database"] return result def new_connection(driver): global dr global reason global login_info l=dr.driverlist() if not driver in l: reason="No such driver:'"+driver+"'" return skipped con=dr.new_connection(driver) if con == None: reason= "Connection was not possible" return skipped if driver in login_info: login=login_info[driver] if "logindatabase" in login: con.set_defaultdatabase(login["logindatabase"]) if "user" in login: con.set_user(login["user"]) if "password" in login: con.set_password(login["password"]) if "host" in login: con.set_password(login["host"]) con.connect(hk_class.noninteractive) if not con.is_connected(): reason="Connection can't connect:"+con.last_servermessage() return skipped return con def create_database(con,db,deleteexisting=False): global reason if not con.server_supports(hk_connection.SUPPORTS_NEW_DATABASE): reason="Server does not support creating new databases" return skipped if (db in con.dblist()) and deleteexisting: con.delete_database(db,hk_class.noninteractive) result=con.create_database(db) if result ==1: return ok else: reason=con.last_servermessage() return failed def open_database(driver,dbname,create=False,deleteexistingdb=False,user="root",password="",host=""): global dr global reason con=new_connection(driver) if con==skipped: return skipped if create: result=create_database(con,dbname,deleteexistingdb) if result!=ok: reason="create_database failed: "+con.last_servermessage() return result if (not dbname in con.dblist() and not exists(dbname)): reason="database '"+dbname+"' does not exist" return failed db=con.new_database(dbname) return db def delete_database(con,db): global reason if not con.server_supports(hk_connection.SUPPORTS_DELETE_DATABASE): reason="Server does not support deleting databases" return skipped if (not db in con.dblist()): reason="Database '"+db+"' does not exist, dblist="+str(con.dblist()) return failed result=con.delete_database(db,hk_class.noninteractive) if result ==1: return ok else: reason=con.last_servermessage() return failed def create_table(db,tbl,insert=False): global reason if not db.connection().server_supports(hk_connection.SUPPORTS_NEW_TABLE): reason="Server does not support creating new tables" return skipped if tbl in db.tablelist(): db.delete_table(tbl,False) table=db.new_table() table.setmode_createtable() table.set_name(tbl) for col in column_definitions: c=table.new_column() c.set_columntype(col[0]) c.set_name(col[1]) c.set_size(col[2]) c.set_primary(col[3]) c.set_notnull(col[4]) result=table.create_table_now() if result ==1: if insert: insert_table(table) return table else: return None def delete_table(db,tbl): global reason if not db.connection().server_supports(hk_connection.SUPPORTS_DELETE_TABLE): reason="Server does not support deleting tables" return skipped if not tbl in db.tablelist(): reason="Table does not exist" return failed if db.delete_table(tbl,False): return ok else: reason="Deleting table failed" return failed def insert_table(tbl): global reason if not tbl.is_enabled(): tbl.enable() if not tbl.is_enabled(): reason="Table could not be enabled" return failed rownumber=0 for row in table_values: colid=0 tbl.setmode_insertrow() for col in row: c=tbl.column_by_name(table_columns[colid]) t=type(col) if t== types.IntType: c.set_asinteger(col) elif t== types.FloatType: c.set_asdouble(col) else: c.set_asstring(col) colid=colid+1 if not tbl.store_changed_data(): reason="Could not add row:"+str(rownumber)+" "+tbl.database().connection().last_servermessage() tbl.set_ignore_changed_data() tbl.disable() return failed rownumber=rownumber+1 tbl.disable() return ok def update_table(tbl): global reason if not tbl.is_enabled(): tbl.enable() if not tbl.is_enabled(): reason="Table could not be enabled" return failed rownumber=0 if tbl.max_rows()table.max_rows(): reason="different amount of rows. Table has "+str(table.max_rows())+", but should have "+str(len(update_values))+tablecontent(table) del db return failed rownumber=0 for row in update_values: colid=0 for col in row: c=table.column_by_name(table_columns[colid]) if c==None: reason="Column:'"+table_columns[colid]+"' not found" return failed t=type(col) if t== types.IntType: if c.asinteger()!=col: reason="Row:"+str(rownumber)+" Integer in column:'"+table_columns[colid]+"' is different\n\t" reason=reason+"Table has '"+c.asinteger()+"', should be '"+col+"'"+tablecontent(table) table.disable() del db return failed elif t== types.FloatType: if c.asdouble()!=col: reason="Row:"+str(rownumber)+" Float in column:'"+table_columns[colid]+"' is different\n\t" reason=reason+"Table has '"+c.asdouble()+"', should be '"+col+"'"+tablecontent(table) table.disable() del db return failed else: if c.asstring()!=col: reason="Row:"+str(rownumber)+" String in column:'"+table_columns[colid]+"' is different\n\t" reason=reason+"Table has '"+c.asstring()+"', should be '"+col+"'"+tablecontent(table) table.disable() del db return failed colid=colid+1 table.goto_next() rownumber=rownumber+1 table.disable() con=db.connection() del db con.disconnect() del con return ok def perform_readwritetests(driver): data="" result=perform_test(driver,tst_createdatabase,data,"create database test") result=perform_test(driver,tst_opendatabase,testdbname(driver),"open existing database test") result=perform_test(driver,tst_createtable,testtablename,"create table test") result=perform_test(driver,tst_insertdata,testtablename,"insert data test") result=perform_test(driver,tst_comparedata,testtablename,"compare inserted data test") if result!=ok: return result=perform_test(driver,tst_updatedata,testtablename,"update data test") result=perform_test(driver,tst_compareupdatedata,testtablename,"compare updated data test") result=perform_test(driver,tst_deletemultiplerows,testtablename,"delete multiple rows test") result=perform_test(driver,tst_deletesinglerow,testtablename,"delete single row test") result=perform_test(driver,tst_deletetable,testtablename,"delete table test") result=perform_test(driver,tst_deletedatabase,testdbname(driver),"delete database test") def perform_fulldrivertest(driver): data="" result=perform_test(driver,tst_connection,data,"connection test") if result==failed: return con=new_connection(driver) if con.server_supports(hk_connection.SUPPORTS_NEW_DATABASE): con.disconnect() perform_readwritetests(driver) del con def perform_readonlydrivertest(driver,database,table): database="/home/horst/encoding.mdb" result=perform_test(driver,tst_opendatabase,database,"open database test") def perform_hk_classestests(driver): print "hk_classestests" #define performed tests for driver in dr.driverlist(): perform_fulldrivertest(driver) #show statistics print_teststatistic()