#!/usr/bin/env python # -*-python-*- # # Perform some simple regression tests. DEFINITELY not exhaustive. # ####################################################################### # # CONFIGURE # HOST = 'kurgan.lyra.org' PORT = 8080 SLACK_TIMEOUT = 120 # END ####################################################################### import davlib import string import sys import getopt import time try: import cStringIO StringIO = cStringIO except ImportError: import StringIO BASE = 'http://%s:%s' % (HOST, PORT) MOD_DAV_PROPS = 'http://apache.org/dav/props/' INFINITY = 'infinity' class mydav(davlib.DAV): def __init__(self, verbose=0): davlib.DAV.__init__(self, HOST, PORT) self.verbose = verbose def _request(self, method, url, *args, **kw): response = apply(davlib.DAV._request, (self, method, url) + args, kw) response.method = method response.url = url if self.verbose: print 'EXEC: %s{%s}. status: %d' % (method, url, response.status) if response.status == 207 or response.status == 424: response.parse_multistatus() elif method == 'LOCK' and response.status == 200: response.parse_lock_response() return response def _one_request(self, name, args, kw): "Perform one request and consume (throw away) the response." response = apply(getattr(davlib.DAV, name), (self,) + args, kw) response.read() return response def delete_x(self, *args, **kw): return self._one_request('delete', args, kw) def mkcol_x(self, *args, **kw): return self._one_request('mkcol', args, kw) def put_x(self, *args, **kw): return self._one_request('put', args, kw) def lock_x(self, *args, **kw): return self._one_request('lock', args, kw) def unlock_x(self, *args, **kw): return self._one_request('unlock', args, kw) def setprops_x(self, *args, **kw): return self._one_request('setprops', args, kw) def copy_x(self, *args, **kw): return self._one_request('copy', args, kw) def move_x(self, *args, **kw): return self._one_request('move', args, kw) def request_x(self, *args, **kw): "Perform a request and consume (throw away) the response." response = apply(self._request, args, kw) response.read() return response def smart_props(self, url, depth=0, props=(), ns='', extra_frames=0): """Get some properties and preprocess the response. Returns: { url: { (uri, localname) : value_elem } } """ if props: if ns: xmlns = ' xmlns:NS="%s"' % ns prefix = 'NS:' else: xmlns = prefix = '' body = davlib.XML_DOC_HEADER + \ '' \ '<%s%s/>' % \ (xmlns, prefix, string.join(props, '/><' + prefix)) else: body = None response = self.propfind(url, body, depth) if response.status == 403: print 'ERROR: Server may be refusing "Depth: infinity" PROPFIND requests.' print ' For TESTING purposes (not production!), this should be enabled.' print ' Failing operation at line %d.' % \ caller_lineno(extra_frames) response.read() # throw out the response body return { } if not is207(response): response.read() # throw out the response body return { } propset = { } # { url: { (uri, localname) : value_elem } } for resp in response.msr.responses: if resp.status and resp.status[0] != 200: print 'ERROR: PROPFIND{%s} had a status of %d.' % \ (url, resp.status[0]) continue props = { } for ps in resp.propstat: status = ps.status and ps.status[0] if status and status != 200 and status != 404: print 'ERROR: PROPFIND{%s} had a status of %d.' % \ (url, ps.status[0]) break for key, value in ps.prop.items(): if props.has_key(key): print 'WARNING: PROPFIND{%s} returned a duplicate property.' % url props[key] = value if len(resp.href) != 1: print 'ERROR: PROPFIND{%s} returned multiple elems in a response.' % url continue href = resp.href[0] if propset.has_key(href): print 'ERROR: PROPFIND{%s} returned a duplicate .' % url continue propset[href] = props return propset def get_locks(self, url, depth=0, extra_frames=0): "Returns { url : [tokens...] }" propset = self.smart_props(url, depth, ('lockdiscovery',), 'DAV:', extra_frames=extra_frames) locks = { } for key, props in propset.items(): if len(props) != 1: print 'ERROR: smart PROPFIND{%s} returned multiple props for <%s>.' % (url, key) continue ld = props.get(('DAV:', 'lockdiscovery')) if not ld: print 'ERROR: smart PROPFIND{%s} returned incorrect prop for <%s>.' % (url, key) locks[key] = gather_tokens(ld) return locks def smart_delete(self, url): "Delete a URL, unlocking resources if necessary." while 1: urls = { } # URLs which failed due to locks response = self.delete_x(url) if response.status == 204 or response.status == 404: return if response.status == 423: # the URL itself was locked urls[url] = None elif response.status == 424: # something under the URL had a problem (e.g. locks) pass elif not is207(response): print 'ERROR: smart DELETE{%s} failed.' % url return if not urls: # gather URLs which failed due to locks for resp in response.msr.responses: if len(resp.href) != 1: print 'ERROR: DELETE{%s}: multiple elems in a response.' % url continue href = resp.href[0] if resp.status: s = resp.status[0] if s == 200: continue if s != 423: print 'ERROR: smart DELETE{%s}: unexpected status %d.' % (url, s) continue if urls.has_key(href): print 'ERROR: smart DELETE{%s}: duplicate .' % url continue urls[href] = None # else what? urls = urls.keys() if len(urls) > 1: urls.sort() prev = urls[0] i = 1 while i < len(urls): if prev == urls[i][:len(prev)]: # previous is a prefix; torch the internal member del urls[i] else: # move on to the next URL prev = urls[i] i = i + 1 unlocked_one = 0 # get the lock information at each URL, then UNLOCK them for sub_url in urls: locks = self.get_locks(sub_url) if len(locks) > 1: print 'ERROR: get_locks{%s} returned props for multiple URLs.' % sub_url continue elif not locks: print 'ERROR: get_locks{%s} returned no information.' % sub_url continue # assume returned URL matches sub_url tokens = locks.values()[0] for token in tokens: r = self.unlock_x(sub_url, token) if is204(r): unlocked_one = 1 if not unlocked_one: print 'ERROR: nothing was unlocked; cannot DELETE.' return def validate_locks(self, url, tokens, verbose=0): # make a local copy tokens = tokens.copy() locks = self.get_locks(url, INFINITY, extra_frames=2) err = 0 for check_url, check_tokens in locks.items(): if len(check_tokens) == 0: if tokens.has_key(check_url): print 'ERROR: <%s> has no locks, but we expected one.' % check_url print ' expected: <%s>' % tokens[check_url] err = 1 elif len(check_tokens) == 1: expect = tokens.get(check_url) if not expect: print 'ERROR: <%s> has an unexpected lock.' % check_url print ' found: <%s>' % check_tokens[0] err = 1 elif expect != check_tokens[0]: print 'ERROR: <%s> has an incorrect token.' % check_url print ' expected: <%s>' % tokens[check_url] print ' found: <%s>' % check_tokens[0] err = 1 else: print 'ERROR: <%s> has more than one lock.' % check_url expect = tokens.get(check_url) if expect: print ' expected: <%s>' % expect else: print ' expected: none' for t in check_tokens: print ' found: <%s>' % t # we've checked this URL, remove it from our expected tokens if tokens.has_key(check_url): del tokens[check_url] # if there are more expectations, then issue errors for each for expect_url, expect_token in tokens.items(): print 'ERROR: <%s> was not returned in the PROPFIND.' % expect_url print ' expected lock: <%s>' % expect_token err = 1 if err: print 'validate: line %d: FAILED.' % caller_lineno() elif verbose: print 'validate: line %d: successful.' % caller_lineno() return err def gather_tokens(ld): tokens = [] for active in ld.children: if active.ns != 'DAV:' or active.name != 'activelock': continue locktoken = active.find('locktoken', 'DAV:') if locktoken: for href in locktoken.children: if href.ns == 'DAV:' and href.name == 'href': tokens.append(href.textof()) return tokens def test_response(r, expected): if r.status != expected: print 'ERROR: line %d: %s{%s} returned %d. Expected %d.' % \ (caller_lineno(1), r.method, r.url, r.status, expected) return 0 return 1 def is200(r): "Is it a 200 (OK) ?" return test_response(r, 200) def is201(r): "Is it a 201 (Created) ?" return test_response(r, 201) def is204(r): "Is it a 204 (No Content) ?" return test_response(r, 204) def is207(r): "Is it a 207 (Multistatus) ?" return test_response(r, 207) def is404(r): "Is it a 404 (Not Found) ?" return test_response(r, 404) def is412(r): "Is it a 412 (Precondition Failed) ?" return test_response(r, 412) def is423(r): "Is it a 423 (Locked) ?" return test_response(r, 423) def is424(r): "Is it a 424 (Failed Dependency) ?" return test_response(r, 424) def prep_if_test(conn): conn.smart_delete('/dav/locktest/iftest') conn.mkcol_x('/dav/locktest/iftest') conn.mkcol_x('/dav/locktest/iftest/sub') conn.put_x('/dav/locktest/iftest/sub/file1', 'body of file1') conn.put_x('/dav/locktest/iftest/sub/file2', 'body of file2') def lock_test(conn, verbose=0): if verbose: print 'Beginning LOCK tests...' conn.smart_delete('/dav/locktest') conn.mkcol_x('/dav/locktest') conn.mkcol_x('/dav/locktest/sub') tokens = { } conn.validate_locks('/dav/locktest/', tokens, verbose) # locknull test r = conn.lock_x('/dav/locktest/locknull') if is200(r): tkn = r.locktoken tokens['/dav/locktest/locknull'] = tkn if_hdr = '(<' + r.locktoken + '>)' # try a PUT to the locknull r = conn.put_x('/dav/locktest/locknull', 'the body') if is423(r): # try again with the right If: header r = conn.put_x('/dav/locktest/locknull', 'the body', extra_hdrs={'If': if_hdr}) if is201(r): # do it one more time! r = conn.put_x('/dav/locktest/locknull', 'the new body', extra_hdrs={'If': if_hdr}) is204(r) conn.validate_locks('/dav/locktest/', tokens, verbose) # can we unlock the locknull? (no locks on parent) r = conn.unlock_x('/dav/locktest/locknull', tkn) if is204(r): del tokens['/dav/locktest/locknull'] # recreate the locknull, then depth=0 lock the parent, then unlock all r = conn.lock_x('/dav/locktest/locknull') if is200(r): tkn = r.locktoken r = conn.lock_x('/dav/locktest', depth=0) if is200(r): tkn2 = r.locktoken # removing the locknull changes the parent, so we need an If: hdr if_hdr = '(<' + tkn2 + '>)' r = conn.unlock_x('/dav/locktest/locknull', tkn, extra_hdrs={'If': if_hdr}) is204(r) r = conn.unlock_x('/dav/locktest', tkn2) is204(r) # Joe Orton's LOCK and PUT test: can a file be PUT into a locked collection? r = conn.lock_x('/dav/locktest/sub') if is200(r): tkn = r.locktoken tokens['/dav/locktest/sub/'] = tkn if_hdr = '(<' + tkn + '>)' r = conn.put_x('/dav/locktest/sub/file', 'the body', extra_hdrs={'If': if_hdr}) is201(r) tokens['/dav/locktest/sub/file'] = tkn conn.validate_locks('/dav/locktest/', tokens, verbose) # time to start testing some If: headers # no-tag-list. lock 'iftest' and delete it. # should fail: parent does not have this locktoken prep_if_test(conn) r = conn.lock_x('/dav/locktest/iftest') if is200(r): if_hdr = '(<' + r.locktoken + '>)' r = conn.delete_x('/dav/locktest/iftest', extra_hdrs={'If':if_hdr}) # the parent fails (not the Request-URI), so we get a 424 is424(r) # no-tag-list. lock 'sub', delete 'iftest'. # should fail: parent does not have this locktoken. prep_if_test(conn) r = conn.lock_x('/dav/locktest/iftest/sub') if is200(r): if_hdr = '(<' + r.locktoken + '>)' r = conn.delete_x('/dav/locktest/iftest', extra_hdrs={'If':if_hdr}) # the Request-URI fails the precondition, so a 412 is okay is412(r) # no-tag-list (two options). lock 'sub', delete 'iftest'. prep_if_test(conn) r = conn.lock_x('/dav/locktest/iftest/sub') if is200(r): if_hdr = '(<%s>) (Not <%s>)' % (r.locktoken, r.locktoken) r = conn.delete_x('/dav/locktest/iftest', extra_hdrs={'If':if_hdr}) is204(r) # no-tag-list (two options). lock both dirs. delete 'sub'. prep_if_test(conn) r = conn.lock_x('/dav/locktest/iftest', depth=0) if is200(r): tkn1 = r.locktoken r = conn.lock_x('/dav/locktest/iftest/sub') if is200(r): tkn2 = r.locktoken if_hdr = '(<%s>) (<%s>)' % (tkn1, tkn2) r = conn.delete_x('/dav/locktest/iftest/sub', extra_hdrs={'If':if_hdr}) is204(r) # tagged-list. lock both dirs. delete 'sub' prep_if_test(conn) r = conn.lock_x('/dav/locktest/iftest', depth=0) if is200(r): tkn1 = r.locktoken r = conn.lock_x('/dav/locktest/iftest/sub') if is200(r): tkn2 = r.locktoken if_hdr = '(<%s>) ' \ '(<%s>)' % (tkn1, tkn2) r = conn.delete_x('/dav/locktest/iftest/sub', extra_hdrs={'If':if_hdr}) is204(r) def timeout_test(conn, verbose=0): if verbose: print 'Beginning timeout tests...' # lock timeouts conn.mkcol_x('/dav/timeout') conn.mkcol_x('/dav/timeout/sub') r = conn.lock_x('/dav/timeout', depth=INFINITY, timeout='Second-2') if is200(r): if SLACK_TIMEOUT > 5: print 'SLACK_TIMEOUT is %d. Now sleeping past the slack time...' % \ SLACK_TIMEOUT time.sleep(3 + SLACK_TIMEOUT) # FIRST: see if the indirect lock timed out conn.validate_locks('/dav/timeout/sub', { }, verbose) # now see if the direct lock timed out conn.validate_locks('/dav/timeout', { }, verbose) def patch_executable(conn, value, expect_code, expect_value): r = conn.setprops_x('/dav/proppatch_test', executable=value, ns=MOD_DAV_PROPS) if is207(r): status = r.msr.responses[0].propstat[0].status[0] if status != expect_code: print 'ERROR: PROPPATCH "executable" with "%s" returned %d. ' \ 'Expected %d.' % (value, status, expect) propset = conn.smart_props('/dav/proppatch_test', 0, ('executable',), MOD_DAV_PROPS) if not propset: print 'ERROR: PROPFIND returned no "executable" information.' else: prop_value = propset.values()[0][(MOD_DAV_PROPS, 'executable')].textof() if prop_value != expect_value: print 'ERROR: PROPPATCH "executable" with "%s". Became "%s". ' \ 'Expected "%s".' % (value, prop_value, expect_value) _prop_values = [ 'hi there', '', 'hello', 'hello', 'hello', 'hello', ] def proppatch_test(conn, verbose=0): if verbose: print 'Beginning PROPPATCH tests...' conn.delete_x('/dav/proppatch_test') conn.put_x('/dav/proppatch_test', 'test body') # test the executable property patch_executable(conn, 'T', 200, 'T') patch_executable(conn, 'F', 200, 'F') patch_executable(conn, 'x', 409, 'F') patch_executable(conn, 'TF', 409, 'F') patch_executable(conn, 'FT', 409, 'F') patch_executable(conn, 'T', 200, 'T') patch_executable(conn, 'F', 200, 'F') patch_executable(conn, 'TF', 409, 'F') patch_executable(conn, 'TF', 409, 'F') patch_executable(conn, 'TF', 409, 'F') # set some dead properties props = [ ] for i in range(len(_prop_values)): props.append('%s' % (i, _prop_values[i], i)) r = apply(conn.setprops_x, ('/dav/proppatch_test',) + tuple(props)) is207(r) # validate them now propset = conn.smart_props('/dav/proppatch_test') props = propset.values()[0] for (ns, name), elem in props.items(): if ns == 'DAV:' or ns == MOD_DAV_PROPS: # ignore these props if verbose: print 'ignoring:', ns, name continue if ns != '': print 'ERROR: PROPPATCH test expected empty NS for <%s>' % name if name[:4] != 'prop': print 'ERROR: unknown property (%s) found' % name else: idx = int(name[4:]) if idx < 0 or idx >= len(_prop_values): print 'ERROR: unknown property (%s) found' % name else: s = StringIO.StringIO() davlib.qp_xml.dump(s, elem) got = s.getvalue() expect = '\n<%s>%s\n' % \ (name, _prop_values[idx], name) if got != expect: print 'ERROR: <%s> returned "%s". Expected "%s".' % \ (name, got, expect) def copymove_test(conn, verbose=0): if verbose: print 'Beginning COPY/MOVE tests...' conn.delete_x('/dav/cm_test') conn.mkcol_x('/dav/cm_test') conn.put_x('/dav/cm_test/file1', 'body of file1') conn.put_x('/dav/cm_test/file2', 'body of file2 is longer') conn.mkcol_x('/dav/cm_test/subdir1') r = conn.copy_x('/dav/cm_test/file1', BASE + '/dav/cm_test/file3') is201(r) r = conn.copy_x('/dav/cm_test/file3', BASE + '/dav/cm_test/subdir1/file3') is201(r) r = conn.copy_x('/dav/cm_test/subdir1', BASE + '/dav/cm_test/subdir2') is201(r) r = conn.move_x('/dav/cm_test/file1', BASE + '/dav/cm_test/file1b') is201(r) r = conn.move_x('/dav/cm_test/subdir2', BASE + '/dav/cm_test/subdir2b') is201(r) r = conn.move_x('/dav/cm_test/subdir2b/file3', BASE + '/dav/cm_test/file4') is201(r) # # The directory layout should be: # # cm_test/ # file1b # file2 # file3 # file4 # subdir1/ # file3 # subdir2b/ # desired = [ '/dav/cm_test/', '/dav/cm_test/file1b', '/dav/cm_test/file2', '/dav/cm_test/file3', '/dav/cm_test/file4', '/dav/cm_test/subdir1/', '/dav/cm_test/subdir1/file3', '/dav/cm_test/subdir2b/', ] props = conn.smart_props('/dav/cm_test', INFINITY, ('getcontentlength',), 'DAV:') actual = props.keys() actual.sort() if actual != desired: print 'ERROR: desired and actual directories do not match.' else: ### verify length? if verbose: print 'COPY/MOVE tests complete.' def caller_lineno(n=0): """Python voodoo to get the caller's line number. The optional parameter specifies how many extra stack frames to go back. """ try: raise 'error' except: t, v, tb = sys.exc_info() f = tb.tb_frame tb = None # clear out a reference cycle for i in range(n): f = f.f_back return f.f_back.f_back.f_lineno def main(): opts, args = getopt.getopt(sys.argv[1:], 'vt') verbose = ('-v', '') in opts do_timeout = ('-t', '') in opts conn = mydav(verbose) lock_test(conn, verbose) if do_timeout: timeout_test(conn, verbose) elif verbose: print 'Skipping timeout tests.' proppatch_test(conn, verbose) copymove_test(conn, verbose) if __name__ == '__main__': main()