#!@PYTHON@ '''CTS: Cluster Testing System: Tests module There are a few things we want to do here: ''' __copyright__=''' Copyright (C) 2000, 2001 Alan Robertson Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import CTS from CM_hb import HBConfig import CTSaudits import time, os, re, types, string, tempfile # List of all class objects for tests which we ought to # consider running. class RandomTests: ''' A collection of tests which are run at random. ''' def __init__(self, scenario, cm, tests, Audits): self.CM = cm self.Env = cm.Env self.Scenario = scenario self.Tests = [] for test in tests: if not issubclass(test.__class__, CTSTest): raise ValueError("Init value must be a subclass of CTSTest") if test.is_applicable(): self.Tests.append(test) if not scenario.IsApplicable(): raise ValueError("Scenario not applicable in" " given Environment") self.Stats = {"success":0, "failure":0, "BadNews":0} self.IndividualStats= {} self.Audits = Audits def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def run(self, max=1): ( ''' Set up the given scenario, then run the selected tests at random for the selected number of iterations. ''') if not self.Scenario.SetUp(self.CM): return None BadNews=CTS.LogWatcher(self.CM["LogFileName"], self.CM["BadRegexes"] , timeout=0) BadNews.setwatch() testcount=1 time.sleep(30) # This makes sure everything is stabilized before starting... for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " Failed.") self.incr("auditfail") while testcount <= max: test = self.Env.RandomGen.choice(self.Tests) # Some tests want a node as an argument. nodechoice = self.Env.RandomNode() self.CM.log("Running test %s (%s) [%d]" % (test.name, nodechoice, testcount)) testcount = testcount + 1 starttime=time.time() ret=test(nodechoice) if ret: self.incr("success"); else: self.incr("failure"); # Better get the current info from the cluster... self.CM.statall() stoptime=time.time() elapsed_time = stoptime - starttime if not test.has_key("min_time"): test["elapsed_time"] = elapsed_time test["min_time"] = elapsed_time test["max_time"] = elapsed_time else: test["elapsed_time"] = test["elapsed_time"] + elapsed_time if elapsed_time < test["min_time"]: test["min_time"] = elapsed_time if elapsed_time > test["max_time"]: test["max_time"] = elapsed_time errcount=0 while errcount < 20: match=BadNews.look() if match: ignorelist=test.errorstoignore() ignorelist.append(" CTS: ") for ignore in ignorelist: if re.search(ignore, match): break else: self.CM.log(match) self.incr("BadNews"); errcount=errcount+1 else: break else: self.CM.log("Big problems. Shutting down.") self.CM.stopall() raise ValueError("Looks like we hit the jackpot! :-)") for audit in self.Audits: if not audit(): self.CM.log("Audit " + audit.name() + " Failed.") test.incr("auditfail") self.incr("auditfail") self.Scenario.TearDown(self.CM) for test in self.Tests: self.IndividualStats[test.name] = test.Stats return self.Stats, self.IndividualStats AllTestClasses = [ ] class CTSTest: ''' A Cluster test. We implement the basic set of properties and behaviors for a generic cluster test. Cluster tests track their own statistics. We keep each of the kinds of counts we track as separate {name,value} pairs. ''' def __init__(self, cm): #self.name="the unnamed test" self.Stats = {"calls":0 , "success":0 , "failure":0 , "skipped":0 , "auditfail":0} # if not issubclass(cm.__class__, ClusterManager): # raise ValueError("Must be a ClusterManager object") self.CM = cm self.timeout=120 def has_key(self, key): return self.Stats.has_key(key) def __setitem__(self, key, value): self.Stats[key] = value def __getitem__(self, key): return self.Stats[key] def incr(self, name): '''Increment (or initialize) the value associated with the given name''' if not self.Stats.has_key(name): self.Stats[name]=0 self.Stats[name] = self.Stats[name]+1 def failure(self, reason="none"): '''Increment the failure count''' self.incr("failure") self.CM.log("Test " + self.name + " failed [reason:" + reason + "]") return None def success(self): '''Increment the success count''' self.incr("success") return 1 def skipped(self): '''Increment the skipped count''' self.incr("skipped") return 1 def __call__(self, node): '''Perform the given test''' raise ValueError("Abstract Class member (__call__)") self.incr("calls") return self.failure() def is_applicable(self): '''Return TRUE if we are applicable in the current test configuration''' raise ValueError("Abstract Class member (is_applicable)") return 1 def canrunnow(self): '''Return TRUE if we can meaningfully run right now''' return 1 def errorstoignore(self): '''Return list of errors which are 'normal' and should be ignored''' return [] ################################################################### class StopTest(CTSTest): ################################################################### '''Stop (deactivate) the cluster manager on a node''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name="stop" self.uspat = self.CM["Pat:We_stopped"] self.thempat = self.CM["Pat:They_stopped"] self.allpat = self.CM["Pat:All_stopped"] def __call__(self, node): '''Perform the 'stop' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: return self.skipped() if node == self.CM.OurNode: self.incr("us") pat = self.uspat else: if self.CM.upcount() <= 1: self.incr("all") pat = (self.allpat % node) else: self.incr("them") pat = (self.thempat % node) watch = CTS.LogWatcher(self.CM["LogFileName"], [pat] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.StopaCM(node) if watch.look(): return self.success() else: return self.failure("no match against %s "% pat) # # We don't register StopTest because it's better when called by # another test... # ################################################################### class StartTest(CTSTest): ################################################################### '''Start (activate) the cluster manager on a node''' def __init__(self, cm, debug=None): CTSTest.__init__(self,cm) self.name="start" self.uspat = self.CM["Pat:We_started"] self.thempat = self.CM["Pat:They_started"] self.debug = debug def __call__(self, node): '''Perform the 'start' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["down"]: return self.skipped() if node == self.CM.OurNode or self.CM.upcount() < 1: self.incr("us") pat = (self.uspat % node) else: self.incr("them") pat = (self.thempat % node) watch = CTS.LogWatcher(self.CM["LogFileName"], [pat] , timeout=self.CM["StartTime"]+10, debug=self.debug) watch.setwatch() self.CM.StartaCM(node) if watch.look(): return self.success() else: self.CM.log("START FAILURE: did not find pattern " + pat) self.CM.log("START TIMEOUT = %d " % self.CM["StartTime"]) return self.failure("did not find pattern " + pat) def is_applicable(self): '''StartTest is always applicable''' return 1 # # We don't register StartTest because it's better when called by # another test... # ################################################################### class FlipTest(CTSTest): ################################################################### '''If it's running, stop it. If it's stopped start it. Overthrow the status quo... ''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="flip" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'flip' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["up"]: self.incr("stopped") ret = self.stop(node) type="up->down" # Give the cluster time to recognize it's gone... time.sleep(self.CM["DeadTime"]+2) elif self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("started") ret = self.start(node) type="down->up" else: return self.skipped() self.incr(type) if ret: return self.success() else: return self.failure("%s failure" % type) def is_applicable(self): '''FlipTest is always applicable''' return 1 # Register FlipTest as a good test to run AllTestClasses.append(FlipTest) ################################################################### class RestartTest(CTSTest): ################################################################### '''Stop and restart a node''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="Restart" self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, node): '''Perform the 'restart' test. ''' self.incr("calls") self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["DeadTime"]+2) ret2 = self.start(node) if not ret1: return self.failure("stop failure") if not ret2: return self.failure("start failure") return self.success() def is_applicable(self): '''RestartTest is always applicable''' return 1 # Register RestartTest as a good test to run AllTestClasses.append(RestartTest) ################################################################### class StonithTest(CTSTest): ################################################################### '''Reboot a node by whacking it with stonith.''' def __init__(self, cm, timeout=600): CTSTest.__init__(self,cm) self.name="Stonith" self.theystopped = self.CM["Pat:They_stopped"] self.allstopped = self.CM["Pat:All_stopped"] self.usstart = self.CM["Pat:We_started"] self.themstart = self.CM["Pat:They_started"] self.timeout = timeout def __call__(self, node): '''Perform the 'stonith' test. (whack the node)''' self.incr("calls") stopwatch = None # Figure out what log message to look for when/if it goes down if self.CM.ShouldBeStatus[node] != self.CM["down"]: if self.CM.upcount() != 1: stopwatch = (self.theystopped % node) # Figure out what log message to look for when it comes up if (self.CM.upcount() <= 1): uppat = (self.usstart % node) else: uppat = (self.themstart % node) upwatch = CTS.LogWatcher(self.CM["LogFileName"], [uppat] , timeout=self.timeout) if stopwatch: watch = CTS.LogWatcher(self.CM["LogFileName"], [stopwatch] , timeout=self.CM["DeadTime"]+10) watch.setwatch() # Reset (stonith) the node StonithWorked=None for tries in 1,2,3,4,5: if self.CM.Env.ResetNode(node): StonithWorked=1 break if not StonithWorked: return self.failure("Stonith failure") upwatch.setwatch() # Look() and see if the machine went down if stopwatch: if watch.look(): ret1=1 else: reason="Did not find " + stopwatch ret1=0 else: ret1=1 # Look() and see if the machine came back up if upwatch.look(): ret2=1 else: reason="Did not find " + uppat ret2=0 self.CM.ShouldBeStatus[node] = self.CM["up"] if ret1 and ret2: return self.success() else: return self.failure(reason) def is_applicable(self): '''StonithTest is applicable unless suppressed by CM.Env["DoStonith"] == FALSE''' if self.CM.Env.has_key("DoStonith"): return self.CM.Env["DoStonith"] return 1 # Register StonithTest as a good test to run AllTestClasses.append(StonithTest) ################################################################### class IPaddrtest(CTSTest): ################################################################### '''Find the machine supporting a particular IP address, and knock it down. [Hint: This code isn't finished yet...] ''' def __init__(self, cm, IPaddrs): CTSTest.__init__(self,cm) self.name="IPaddrtest" self.IPaddrs = IPaddrs self.start = StartTest(cm) self.stop = StopTest(cm) def __call__(self, IPaddr): ''' Perform the IPaddr test... ''' self.incr("calls") node = self.CM.Env.RandomNode() self.incr("node:" + node) if self.CM.ShouldBeStatus[node] == self.CM["down"]: self.incr("WasStopped") self.start(node) ret1 = self.stop(node) # Give the cluster time to recognize we're gone... time.sleep(self.CM["DeadTime"]+10) ret2 = self.start(node) if not ret1: return self.failure("Could not stop") if not ret2: return self.failure("Could not start") return self.success() def is_applicable(self): '''IPaddrtest is always applicable (but shouldn't be)''' return 1 ################################################################### class StartOnebyOne(CTSTest): ################################################################### '''Start all the nodes ~ one by one''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="StartOnebyOne" def __call__(self, dummy): '''Perform the 'StartOnebyOne' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] != self.CM["down"]: self.incr("stops") self.stop = StopTest(self.CM) self.stop(node) watchpats = [ ] pat = self.CM["Pat:We_started"] for node in self.CM.Env["nodes"]: thispat = (pat % node) watchpats.append(thispat) # Start all the nodes - one by one... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.ReturnOnlyMatch() watch.setwatch() for node in self.CM.Env["nodes"]: self.CM.StartaCM(node) if watch.lookforall(): return self.success() return self.failure("Did not find start pattern(s): " + repr(watch.unmatched)) def is_applicable(self): '''StartOnebyOne is always applicable''' return 1 # Register StartOnebyOne as a good test to run AllTestClasses.append(StartOnebyOne) ################################################################### class SimulStart(CTSTest): ################################################################### '''Start all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStart" def __call__(self, dummy): '''Perform the 'SimulStart' test. ''' self.incr("calls") # We ignore the "node" parameter... # Shut down all the nodes... for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] != self.CM["down"]: self.incr("stops") self.stop = StopTest(self.CM) self.stop(node) watchpats = [ ] pat = self.CM["Pat:We_started"] for node in self.CM.Env["nodes"]: thispat = (pat % node) watchpats.append(thispat) # Start all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.ReturnOnlyMatch() watch.setwatch() for node in self.CM.Env["nodes"]: self.CM.StartaCMnoBlock(node) if watch.lookforall(): return self.success() return self.failure("Did not find start pattern(s): " + repr(watch.unmatched)) def is_applicable(self): '''SimulStart is always applicable''' return 1 # Register SimulStart as a good test to run AllTestClasses.append(SimulStart) class SimulStop(CTSTest): ################################################################### '''Stop all the nodes ~ simultaneously''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="SimulStop" def __call__(self, dummy): '''Perform the 'SimulStop' test. ''' self.incr("calls") # We ignore the "node" parameter... # Start up all the nodes... for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] != self.CM["up"]: self.incr("started") self.start = StartTest(self.CM) self.start(node) watchpats = [ ] pat = self.CM["Pat:We_stopped"] for node in self.CM.Env["nodes"]: prefix = ('(%s) .* ' % node) thispat = prefix + pat watchpats.append(thispat) # Stop all the nodes - at about the same time... watch = CTS.LogWatcher(self.CM["LogFileName"], watchpats , timeout=self.CM["DeadTime"]+10) watch.ReturnOnlyMatch() watch.setwatch() for node in self.CM.Env["nodes"]: self.CM.StopaCM(node) if watch.lookforall(): return self.success() return self.failure("Did not find stop pattern(s): " + repr(watch.unmatched)) def is_applicable(self): '''SimulStop is always applicable''' return 1 # Register SimulStop as a good test to run AllTestClasses.append(SimulStop) ################################################################### class StandbyTest(CTSTest): ################################################################### '''Put a node in standby mode''' def __init__(self, cm): CTSTest.__init__(self,cm) self.name="standby" self.successpat = self.CM["Pat:StandbyOK"] self.nostandbypat = self.CM["Pat:StandbyNONE"] self.transient = self.CM["Pat:StandbyTRANSIENT"] def __call__(self, node): '''Perform the 'standby' test. ''' self.incr("calls") if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() if self.CM.upcount() < 2: self.incr("nostandby") pat = self.nostandbypat; else: self.incr("standby") pat = self.successpat; # # You could make a good argument that the cluster manager # ought to give us good clues on when its a bad time to # switch over to the other side, but heartbeat doesn't... # It could also queue the request. But, heartbeat # doesn't do that either :-) # retrycount=0 while (retrycount < 10): watch = CTS.LogWatcher(self.CM["LogFileName"] , [pat, self.transient] , timeout=self.CM["DeadTime"]+10) watch.setwatch() self.CM.rsh(node, self.CM["Standby"]) match = watch.look() if match: if re.search(self.transient, match): self.incr("retries") time.sleep(2); retrycount=retrycount+1 else: return self.success() else: break # No point in retrying... return self.failure("did not find pattern " + pat) def is_applicable(self): '''StandbyTest is applicable when the CM has a Standby command''' if not self.CM.has_key("Standby"): return None else: #if self.CM.Env.has_key("DoStandby"): #flag=self.CM.Env["DoStandby"] #if type(flag) == types.IntType: #return flag #if not re.match("[yt]", flag, re.I): #return None # # We need to strip off everything after the first blank # cmd=self.CM["Standby"]; cmd = cmd.split()[0] if not os.access(cmd, os.X_OK): return None cf = self.CM.cf if not cf.Parameters.has_key("auto_failback"): return None elif cf.Parameters["auto_failback"][0] == "legacy": return None return 1 # Register StandbyTest as a good test to run AllTestClasses.append(StandbyTest) ####################################################################### class Fastdetection(CTSTest): ####################################################################### '''Test the time which one node find out the other node is killed very quickly''' def __init__(self,cm,timeout=60): CTSTest.__init__(self, cm) self.name = "DetectionTime" self.they_stopped = self.CM["Pat:They_stopped"] self.timeout = timeout self.start = StartTest(cm) self.standby = StandbyTest(cm) self.__setitem__("min", 0) self.__setitem__("max", 0) self.__setitem__("totaltime", 0) def __call__(self, node): '''Perform the fastfailureDetection test''' self.incr("calls") if self.CM.ShouldBeStatus[node] != self.CM["up"]: ret=self.start(node) if not ret: return ret; if self.CM.upcount() < 2: return self.skipped() # Make sure they're not holding any resources ret = self.standby(node) if not ret: return ret; stoppat = (self.they_stopped % node) stopwatch = CTS.LogWatcher(self.CM["LogFileName"], [stoppat], timeout=self.timeout) stopwatch.setwatch() if self.CM.rsh(node, "killall -9 heartbeat")==0: Starttime = os.times()[4] if stopwatch.look(): Stoptime = os.times()[4] self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") Detectiontime = Stoptime-Starttime detectms = int(Detectiontime*1000+0.5) self.CM.log("...failure detection time: %d ms" % detectms) self.Stats["totaltime"] = self.Stats["totaltime"] + Detectiontime if self.Stats["min"] == 0: self.Stats["min"] = Detectiontime if Detectiontime > self.Stats["max"]: self.Stats["max"] = Detectiontime if Detectiontime < self.Stats["min"]: self.Stats["min"] = Detectiontime self.CM.ShouldBeStatus[node] = self.CM["down"] self.start(node) return self.success() else: self.CM.rsh(node, "killall -9 @libdir@/heartbeat/ccm @libdir@/heartbeat/ipfail >/dev/null 2>&1; true") self.CM.ShouldBeStatus[node] = self.CM["down"] ret=self.start(node) return self.failure("Didn't find the log message") else: return self.failure("Couldn't stop heartbeat") def is_applicable(self): '''This test is applicable when auto_failback != legacy''' return self.standby.is_applicable() AllTestClasses.append(Fastdetection) ############################################################################## class BandwidthTest(CTSTest): ############################################################################## # Tests should not be cluster-manager-specific # If you need to find out cluster manager configuration to do this, then # it should be added to the generic cluster manager API. '''Test the bandwidth which heartbeat uses''' def __init__(self, cm): CTSTest.__init__(self, cm) self.name = "Bandwidth" self.start = StartTest(cm) self.__setitem__("min",0) self.__setitem__("max",0) self.__setitem__("totalbandwidth",0) self.tempfile = tempfile.mktemp(".cts") def __call__(self, node): '''Perform the Bandwidth test''' self.incr("calls") if self.CM.upcount()<1: return self.skipped() Path = self.CM.InternalCommConfig() if "ip" not in Path["mediatype"]: return self.skipped() port = Path["port"][0] port = int(port) if self.CM.ShouldBeStatus[node] != self.CM["up"]: ret = self.start(node) if not ret: return ret time.sleep(5) # We get extra messages right after startup. fstmpfile = "@HA_VARRUNDIR@/band_estimate" dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ % (port, fstmpfile); rc = self.CM.rsh(node, dumpcmd) if rc == 0: farfile = "root@%s:%s" % (node, fstmpfile) if self.CM.rsh.cp(farfile, self.tempfile): Bandwidth = self.countbandwidth(self.tempfile) if not Bandwidth: self.CM.log("Could not compute bandwidth.") return self.success() else: return self.failure("scp failure") intband = int(Bandwidth + 0.5) self.CM.log("...heartbeat bandwidth: %d bits/sec" % intband) self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth if self.Stats["min"] == 0: self.Stats["min"] = Bandwidth if Bandwidth > self.Stats["max"]: self.Stats["max"] = Bandwidth if Bandwidth < self.Stats["min"]: self.Stats["min"] = Bandwidth self.CM.rsh(node, "rm -f %s" % fstmpfile) os.unlink(self.tempfile) return self.success() else: return self.failure("no response from tcpdump command [%d]!" % rc) def countbandwidth(self, file): try: fp = open(file, "r") except IOError: print ("countbandwidth: Cannot open %s" % file) return None fp.seek(0) count = 0 sum = 0 while 1: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count=count+1 linesplit = string.split(line," ") for j in range(len(linesplit)-1): if linesplit[j]=="udp": break if linesplit[j]=="length:": break try: sum = sum + int(linesplit[j+1]) except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T1 = linesplit[0] timesplit = string.split(T1,":") time2split = string.split(timesplit[2],".") time1 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 break while count < 100: line = fp.readline() if not line: return None if re.search("udp",line) or re.search("UDP,", line): count = count+1 linessplit = string.split(line," ") for j in range(len(linessplit)-1): if linessplit[j] =="udp": break if linesplit[j]=="length:": break try: sum=int(linessplit[j+1])+sum except ValueError: self.CM.log("Invalid tcpdump line: %s" % line) return None T2 = linessplit[0] timesplit = string.split(T2,":") time2split = string.split(timesplit[2],".") time2 = (long(timesplit[0])*60+long(timesplit[1]))*60+long(time2split[0])+long(time2split[1])*0.000001 time = time2-time1 if (time <= 0): return 0 return (sum*8)/time def is_applicable(self): '''BandwidthTest is always applicable''' return 1 AllTestClasses.append(BandwidthTest) ########################################################################## class RedundantpathTest(CTSTest): ########################################################################## '''In heartbeat, it has redundant path to communicate between the cluster''' # # Tests should not be cluster-manager specific # One needs to isolate what you need from the cluster manager and then # add a (new) API to do it. # def __init__(self,cm,timeout=60): CTSTest.__init__(self,cm) self.name = "RedundantpathTest" self.timeout = timeout def PathCount(self): '''Return number of communication paths''' Path = self.CM.InternalCommConfig() cf = self.CM.cf eths = [] serials = [] num = 0 for interface in Path["interface"]: if re.search("eth",interface): eths.append(interface) num = num + 1 if re.search("/dev",interface): serials.append(interface) num = num + 1 return (num, eths, serials) def __call__(self,node): '''Perform redundant path test''' self.incr("calls") if self.CM.ShouldBeStatus[node]!=self.CM["up"]: return self.skipped() (num, eths, serials) = self.PathCount() for eth in eths: if self.CM.rsh(node,"ifconfig %s down" % eth)==0: PathDown = "OK" break if PathDown != "OK": for serial in serials: if self.CM.rsh(node,"setserial %s uart none" % serial)==0: PathDown = "OK" break if PathDown != "OK": return self.failure("Cannot break the path") time.sleep(self.timeout) for audit in CTSaudits.AuditList(self.CM): if not audit(): for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.failure("Redundant path fail") for eth in eths: self.CM.rsh(node,"ifconfig %s up" % eth) for serial in serials: self.CM.rsh(node,"setserial %s uart 16550" % serial) return self.success() def is_applicable(self): '''It is always applicable''' return self.PathCount()[0] > 1 #AllTestClasses.append(RedundantpathTest) ########################################################################## class DRBDTest(CTSTest): ########################################################################## '''In heartbeat, it provides replicated storage.''' def __init__(self,cm, timeout=10): CTSTest.__init__(self,cm) self.name = "DRBD" self.timeout = timeout def __call__(self, dummy): '''Perform the 'DRBD' test.''' self.incr("calls") for node in self.CM.Env["nodes"]: if self.CM.ShouldBeStatus[node] == self.CM["down"]: return self.skipped() # Note: All these special cases with Start/Stop/StatusDRBD # should be reworked to use resource objects instead of # being hardwired to bypass the objects here. for node in self.CM.Env["nodes"]: done=time.time()+self.timeout+1 while (time.time()done: return self.failure("Can't start drbd, please check it") device={} for node in self.CM.Env["nodes"]: device[node]=self.getdevice(node) node = self.CM.Env["nodes"][0] done=time.time()+self.timeout+1 while 1: if (time.time()>done): return self.failure("the drbd could't sync") self.CM.rsh(node,"cp /proc/drbd @HA_VARRUNDIR@ >/dev/null 2>&1") if self.CM.rsh.cp("%s:@HA_VARRUNDIR@/drbd" % node,"@HA_VARRUNDIR@"): line = open("/tmp/@HA_VARRUNDIR@").readlines()[2] p = line.find("Primary") s1 = line.find("Secondary") s2 = line.rfind("Secondary") if s1!=s2: if self.CM.rsh(node,"drbdsetup %s primary" % device[node]): pass if p!=-1: if p