#!/usr/bin/env python
# -*-python-*-
#
# Perform some simple regression tests. DEFINITELY not exhaustive.
#
#######################################################################
#
# CONFIGURE
#
HOST = 'kurgan.lyra.org'
PORT = 8080
SLACK_TIMEOUT = 120
# END
#######################################################################
import davlib
import string
import sys
import getopt
import time
try:
import cStringIO
StringIO = cStringIO
except ImportError:
import StringIO
BASE = 'http://%s:%s' % (HOST, PORT)
MOD_DAV_PROPS = 'http://apache.org/dav/props/'
INFINITY = 'infinity'
class mydav(davlib.DAV):
def __init__(self, verbose=0):
davlib.DAV.__init__(self, HOST, PORT)
self.verbose = verbose
def _request(self, method, url, *args, **kw):
response = apply(davlib.DAV._request, (self, method, url) + args, kw)
response.method = method
response.url = url
if self.verbose:
print 'EXEC: %s{%s}. status: %d' % (method, url, response.status)
if response.status == 207 or response.status == 424:
response.parse_multistatus()
elif method == 'LOCK' and response.status == 200:
response.parse_lock_response()
return response
def _one_request(self, name, args, kw):
"Perform one request and consume (throw away) the response."
response = apply(getattr(davlib.DAV, name), (self,) + args, kw)
response.read()
return response
def delete_x(self, *args, **kw):
return self._one_request('delete', args, kw)
def mkcol_x(self, *args, **kw):
return self._one_request('mkcol', args, kw)
def put_x(self, *args, **kw):
return self._one_request('put', args, kw)
def lock_x(self, *args, **kw):
return self._one_request('lock', args, kw)
def unlock_x(self, *args, **kw):
return self._one_request('unlock', args, kw)
def setprops_x(self, *args, **kw):
return self._one_request('setprops', args, kw)
def copy_x(self, *args, **kw):
return self._one_request('copy', args, kw)
def move_x(self, *args, **kw):
return self._one_request('move', args, kw)
def request_x(self, *args, **kw):
"Perform a request and consume (throw away) the response."
response = apply(self._request, args, kw)
response.read()
return response
def smart_props(self, url, depth=0, props=(), ns='', extra_frames=0):
"""Get some properties and preprocess the response.
Returns: { url: { (uri, localname) : value_elem } }
"""
if props:
if ns:
xmlns = ' xmlns:NS="%s"' % ns
prefix = 'NS:'
else:
xmlns = prefix = ''
body = davlib.XML_DOC_HEADER + \
'' \
'<%s%s/>' % \
(xmlns, prefix, string.join(props, '/><' + prefix))
else:
body = None
response = self.propfind(url, body, depth)
if response.status == 403:
print 'ERROR: Server may be refusing "Depth: infinity" PROPFIND requests.'
print ' For TESTING purposes (not production!), this should be enabled.'
print ' Failing operation at line %d.' % \
caller_lineno(extra_frames)
response.read() # throw out the response body
return { }
if not is207(response):
response.read() # throw out the response body
return { }
propset = { } # { url: { (uri, localname) : value_elem } }
for resp in response.msr.responses:
if resp.status and resp.status[0] != 200:
print 'ERROR: PROPFIND{%s} had a status of %d.' % \
(url, resp.status[0])
continue
props = { }
for ps in resp.propstat:
status = ps.status and ps.status[0]
if status and status != 200 and status != 404:
print 'ERROR: PROPFIND{%s} had a status of %d.' % \
(url, ps.status[0])
break
for key, value in ps.prop.items():
if props.has_key(key):
print 'WARNING: PROPFIND{%s} returned a duplicate property.' % url
props[key] = value
if len(resp.href) != 1:
print 'ERROR: PROPFIND{%s} returned multiple elems in a response.' % url
continue
href = resp.href[0]
if propset.has_key(href):
print 'ERROR: PROPFIND{%s} returned a duplicate .' % url
continue
propset[href] = props
return propset
def get_locks(self, url, depth=0, extra_frames=0):
"Returns { url : [tokens...] }"
propset = self.smart_props(url, depth, ('lockdiscovery',), 'DAV:',
extra_frames=extra_frames)
locks = { }
for key, props in propset.items():
if len(props) != 1:
print 'ERROR: smart PROPFIND{%s} returned multiple props for <%s>.' % (url, key)
continue
ld = props.get(('DAV:', 'lockdiscovery'))
if not ld:
print 'ERROR: smart PROPFIND{%s} returned incorrect prop for <%s>.' % (url, key)
locks[key] = gather_tokens(ld)
return locks
def smart_delete(self, url):
"Delete a URL, unlocking resources if necessary."
while 1:
urls = { } # URLs which failed due to locks
response = self.delete_x(url)
if response.status == 204 or response.status == 404:
return
if response.status == 423:
# the URL itself was locked
urls[url] = None
elif response.status == 424:
# something under the URL had a problem (e.g. locks)
pass
elif not is207(response):
print 'ERROR: smart DELETE{%s} failed.' % url
return
if not urls:
# gather URLs which failed due to locks
for resp in response.msr.responses:
if len(resp.href) != 1:
print 'ERROR: DELETE{%s}: multiple elems in a response.' % url
continue
href = resp.href[0]
if resp.status:
s = resp.status[0]
if s == 200:
continue
if s != 423:
print 'ERROR: smart DELETE{%s}: unexpected status %d.' % (url, s)
continue
if urls.has_key(href):
print 'ERROR: smart DELETE{%s}: duplicate .' % url
continue
urls[href] = None
# else what?
urls = urls.keys()
if len(urls) > 1:
urls.sort()
prev = urls[0]
i = 1
while i < len(urls):
if prev == urls[i][:len(prev)]:
# previous is a prefix; torch the internal member
del urls[i]
else:
# move on to the next URL
prev = urls[i]
i = i + 1
unlocked_one = 0
# get the lock information at each URL, then UNLOCK them
for sub_url in urls:
locks = self.get_locks(sub_url)
if len(locks) > 1:
print 'ERROR: get_locks{%s} returned props for multiple URLs.' % sub_url
continue
elif not locks:
print 'ERROR: get_locks{%s} returned no information.' % sub_url
continue
# assume returned URL matches sub_url
tokens = locks.values()[0]
for token in tokens:
r = self.unlock_x(sub_url, token)
if is204(r):
unlocked_one = 1
if not unlocked_one:
print 'ERROR: nothing was unlocked; cannot DELETE.'
return
def validate_locks(self, url, tokens, verbose=0):
# make a local copy
tokens = tokens.copy()
locks = self.get_locks(url, INFINITY, extra_frames=2)
err = 0
for check_url, check_tokens in locks.items():
if len(check_tokens) == 0:
if tokens.has_key(check_url):
print 'ERROR: <%s> has no locks, but we expected one.' % check_url
print ' expected: <%s>' % tokens[check_url]
err = 1
elif len(check_tokens) == 1:
expect = tokens.get(check_url)
if not expect:
print 'ERROR: <%s> has an unexpected lock.' % check_url
print ' found: <%s>' % check_tokens[0]
err = 1
elif expect != check_tokens[0]:
print 'ERROR: <%s> has an incorrect token.' % check_url
print ' expected: <%s>' % tokens[check_url]
print ' found: <%s>' % check_tokens[0]
err = 1
else:
print 'ERROR: <%s> has more than one lock.' % check_url
expect = tokens.get(check_url)
if expect:
print ' expected: <%s>' % expect
else:
print ' expected: none'
for t in check_tokens:
print ' found: <%s>' % t
# we've checked this URL, remove it from our expected tokens
if tokens.has_key(check_url):
del tokens[check_url]
# if there are more expectations, then issue errors for each
for expect_url, expect_token in tokens.items():
print 'ERROR: <%s> was not returned in the PROPFIND.' % expect_url
print ' expected lock: <%s>' % expect_token
err = 1
if err:
print 'validate: line %d: FAILED.' % caller_lineno()
elif verbose:
print 'validate: line %d: successful.' % caller_lineno()
return err
def gather_tokens(ld):
tokens = []
for active in ld.children:
if active.ns != 'DAV:' or active.name != 'activelock':
continue
locktoken = active.find('locktoken', 'DAV:')
if locktoken:
for href in locktoken.children:
if href.ns == 'DAV:' and href.name == 'href':
tokens.append(href.textof())
return tokens
def test_response(r, expected):
if r.status != expected:
print 'ERROR: line %d: %s{%s} returned %d. Expected %d.' % \
(caller_lineno(1), r.method, r.url, r.status, expected)
return 0
return 1
def is200(r):
"Is it a 200 (OK) ?"
return test_response(r, 200)
def is201(r):
"Is it a 201 (Created) ?"
return test_response(r, 201)
def is204(r):
"Is it a 204 (No Content) ?"
return test_response(r, 204)
def is207(r):
"Is it a 207 (Multistatus) ?"
return test_response(r, 207)
def is404(r):
"Is it a 404 (Not Found) ?"
return test_response(r, 404)
def is412(r):
"Is it a 412 (Precondition Failed) ?"
return test_response(r, 412)
def is423(r):
"Is it a 423 (Locked) ?"
return test_response(r, 423)
def is424(r):
"Is it a 424 (Failed Dependency) ?"
return test_response(r, 424)
def prep_if_test(conn):
conn.smart_delete('/dav/locktest/iftest')
conn.mkcol_x('/dav/locktest/iftest')
conn.mkcol_x('/dav/locktest/iftest/sub')
conn.put_x('/dav/locktest/iftest/sub/file1', 'body of file1')
conn.put_x('/dav/locktest/iftest/sub/file2', 'body of file2')
def lock_test(conn, verbose=0):
if verbose:
print 'Beginning LOCK tests...'
conn.smart_delete('/dav/locktest')
conn.mkcol_x('/dav/locktest')
conn.mkcol_x('/dav/locktest/sub')
tokens = { }
conn.validate_locks('/dav/locktest/', tokens, verbose)
# locknull test
r = conn.lock_x('/dav/locktest/locknull')
if is200(r):
tkn = r.locktoken
tokens['/dav/locktest/locknull'] = tkn
if_hdr = '(<' + r.locktoken + '>)'
# try a PUT to the locknull
r = conn.put_x('/dav/locktest/locknull', 'the body')
if is423(r):
# try again with the right If: header
r = conn.put_x('/dav/locktest/locknull', 'the body',
extra_hdrs={'If': if_hdr})
if is201(r):
# do it one more time!
r = conn.put_x('/dav/locktest/locknull', 'the new body',
extra_hdrs={'If': if_hdr})
is204(r)
conn.validate_locks('/dav/locktest/', tokens, verbose)
# can we unlock the locknull? (no locks on parent)
r = conn.unlock_x('/dav/locktest/locknull', tkn)
if is204(r):
del tokens['/dav/locktest/locknull']
# recreate the locknull, then depth=0 lock the parent, then unlock all
r = conn.lock_x('/dav/locktest/locknull')
if is200(r):
tkn = r.locktoken
r = conn.lock_x('/dav/locktest', depth=0)
if is200(r):
tkn2 = r.locktoken
# removing the locknull changes the parent, so we need an If: hdr
if_hdr = '(<' + tkn2 + '>)'
r = conn.unlock_x('/dav/locktest/locknull', tkn,
extra_hdrs={'If': if_hdr})
is204(r)
r = conn.unlock_x('/dav/locktest', tkn2)
is204(r)
# Joe Orton's LOCK and PUT test: can a file be PUT into a locked collection?
r = conn.lock_x('/dav/locktest/sub')
if is200(r):
tkn = r.locktoken
tokens['/dav/locktest/sub/'] = tkn
if_hdr = '(<' + tkn + '>)'
r = conn.put_x('/dav/locktest/sub/file', 'the body',
extra_hdrs={'If': if_hdr})
is201(r)
tokens['/dav/locktest/sub/file'] = tkn
conn.validate_locks('/dav/locktest/', tokens, verbose)
# time to start testing some If: headers
# no-tag-list. lock 'iftest' and delete it.
# should fail: parent does not have this locktoken
prep_if_test(conn)
r = conn.lock_x('/dav/locktest/iftest')
if is200(r):
if_hdr = '(<' + r.locktoken + '>)'
r = conn.delete_x('/dav/locktest/iftest', extra_hdrs={'If':if_hdr})
# the parent fails (not the Request-URI), so we get a 424
is424(r)
# no-tag-list. lock 'sub', delete 'iftest'.
# should fail: parent does not have this locktoken.
prep_if_test(conn)
r = conn.lock_x('/dav/locktest/iftest/sub')
if is200(r):
if_hdr = '(<' + r.locktoken + '>)'
r = conn.delete_x('/dav/locktest/iftest', extra_hdrs={'If':if_hdr})
# the Request-URI fails the precondition, so a 412 is okay
is412(r)
# no-tag-list (two options). lock 'sub', delete 'iftest'.
prep_if_test(conn)
r = conn.lock_x('/dav/locktest/iftest/sub')
if is200(r):
if_hdr = '(<%s>) (Not <%s>)' % (r.locktoken, r.locktoken)
r = conn.delete_x('/dav/locktest/iftest', extra_hdrs={'If':if_hdr})
is204(r)
# no-tag-list (two options). lock both dirs. delete 'sub'.
prep_if_test(conn)
r = conn.lock_x('/dav/locktest/iftest', depth=0)
if is200(r):
tkn1 = r.locktoken
r = conn.lock_x('/dav/locktest/iftest/sub')
if is200(r):
tkn2 = r.locktoken
if_hdr = '(<%s>) (<%s>)' % (tkn1, tkn2)
r = conn.delete_x('/dav/locktest/iftest/sub', extra_hdrs={'If':if_hdr})
is204(r)
# tagged-list. lock both dirs. delete 'sub'
prep_if_test(conn)
r = conn.lock_x('/dav/locktest/iftest', depth=0)
if is200(r):
tkn1 = r.locktoken
r = conn.lock_x('/dav/locktest/iftest/sub')
if is200(r):
tkn2 = r.locktoken
if_hdr = '(<%s>) ' \
'(<%s>)' % (tkn1, tkn2)
r = conn.delete_x('/dav/locktest/iftest/sub', extra_hdrs={'If':if_hdr})
is204(r)
def timeout_test(conn, verbose=0):
if verbose:
print 'Beginning timeout tests...'
# lock timeouts
conn.mkcol_x('/dav/timeout')
conn.mkcol_x('/dav/timeout/sub')
r = conn.lock_x('/dav/timeout', depth=INFINITY, timeout='Second-2')
if is200(r):
if SLACK_TIMEOUT > 5:
print 'SLACK_TIMEOUT is %d. Now sleeping past the slack time...' % \
SLACK_TIMEOUT
time.sleep(3 + SLACK_TIMEOUT)
# FIRST: see if the indirect lock timed out
conn.validate_locks('/dav/timeout/sub', { }, verbose)
# now see if the direct lock timed out
conn.validate_locks('/dav/timeout', { }, verbose)
def patch_executable(conn, value, expect_code, expect_value):
r = conn.setprops_x('/dav/proppatch_test', executable=value,
ns=MOD_DAV_PROPS)
if is207(r):
status = r.msr.responses[0].propstat[0].status[0]
if status != expect_code:
print 'ERROR: PROPPATCH "executable" with "%s" returned %d. ' \
'Expected %d.' % (value, status, expect)
propset = conn.smart_props('/dav/proppatch_test', 0, ('executable',),
MOD_DAV_PROPS)
if not propset:
print 'ERROR: PROPFIND returned no "executable" information.'
else:
prop_value = propset.values()[0][(MOD_DAV_PROPS, 'executable')].textof()
if prop_value != expect_value:
print 'ERROR: PROPPATCH "executable" with "%s". Became "%s". ' \
'Expected "%s".' % (value, prop_value, expect_value)
_prop_values = [
'hi there',
'',
'hello',
'hello',
'hello',
'hello',
]
def proppatch_test(conn, verbose=0):
if verbose:
print 'Beginning PROPPATCH tests...'
conn.delete_x('/dav/proppatch_test')
conn.put_x('/dav/proppatch_test', 'test body')
# test the executable property
patch_executable(conn, 'T', 200, 'T')
patch_executable(conn, 'F', 200, 'F')
patch_executable(conn, 'x', 409, 'F')
patch_executable(conn, 'TF', 409, 'F')
patch_executable(conn, 'FT', 409, 'F')
patch_executable(conn, 'T', 200, 'T')
patch_executable(conn, 'F', 200, 'F')
patch_executable(conn, 'TF', 409, 'F')
patch_executable(conn, 'TF', 409, 'F')
patch_executable(conn, 'TF', 409, 'F')
# set some dead properties
props = [ ]
for i in range(len(_prop_values)):
props.append('%s' % (i, _prop_values[i], i))
r = apply(conn.setprops_x, ('/dav/proppatch_test',) + tuple(props))
is207(r)
# validate them now
propset = conn.smart_props('/dav/proppatch_test')
props = propset.values()[0]
for (ns, name), elem in props.items():
if ns == 'DAV:' or ns == MOD_DAV_PROPS:
# ignore these props
if verbose:
print 'ignoring:', ns, name
continue
if ns != '':
print 'ERROR: PROPPATCH test expected empty NS for <%s>' % name
if name[:4] != 'prop':
print 'ERROR: unknown property (%s) found' % name
else:
idx = int(name[4:])
if idx < 0 or idx >= len(_prop_values):
print 'ERROR: unknown property (%s) found' % name
else:
s = StringIO.StringIO()
davlib.qp_xml.dump(s, elem)
got = s.getvalue()
expect = '\n<%s>%s%s>\n' % \
(name, _prop_values[idx], name)
if got != expect:
print 'ERROR: <%s> returned "%s". Expected "%s".' % \
(name, got, expect)
def copymove_test(conn, verbose=0):
if verbose:
print 'Beginning COPY/MOVE tests...'
conn.delete_x('/dav/cm_test')
conn.mkcol_x('/dav/cm_test')
conn.put_x('/dav/cm_test/file1', 'body of file1')
conn.put_x('/dav/cm_test/file2', 'body of file2 is longer')
conn.mkcol_x('/dav/cm_test/subdir1')
r = conn.copy_x('/dav/cm_test/file1', BASE + '/dav/cm_test/file3')
is201(r)
r = conn.copy_x('/dav/cm_test/file3', BASE + '/dav/cm_test/subdir1/file3')
is201(r)
r = conn.copy_x('/dav/cm_test/subdir1', BASE + '/dav/cm_test/subdir2')
is201(r)
r = conn.move_x('/dav/cm_test/file1', BASE + '/dav/cm_test/file1b')
is201(r)
r = conn.move_x('/dav/cm_test/subdir2', BASE + '/dav/cm_test/subdir2b')
is201(r)
r = conn.move_x('/dav/cm_test/subdir2b/file3', BASE + '/dav/cm_test/file4')
is201(r)
#
# The directory layout should be:
#
# cm_test/
# file1b
# file2
# file3
# file4
# subdir1/
# file3
# subdir2b/
#
desired = [
'/dav/cm_test/',
'/dav/cm_test/file1b',
'/dav/cm_test/file2',
'/dav/cm_test/file3',
'/dav/cm_test/file4',
'/dav/cm_test/subdir1/',
'/dav/cm_test/subdir1/file3',
'/dav/cm_test/subdir2b/',
]
props = conn.smart_props('/dav/cm_test', INFINITY,
('getcontentlength',), 'DAV:')
actual = props.keys()
actual.sort()
if actual != desired:
print 'ERROR: desired and actual directories do not match.'
else:
### verify length?
if verbose:
print 'COPY/MOVE tests complete.'
def caller_lineno(n=0):
"""Python voodoo to get the caller's line number.
The optional parameter specifies how many extra stack frames to go back.
"""
try:
raise 'error'
except:
t, v, tb = sys.exc_info()
f = tb.tb_frame
tb = None # clear out a reference cycle
for i in range(n):
f = f.f_back
return f.f_back.f_back.f_lineno
def main():
opts, args = getopt.getopt(sys.argv[1:], 'vt')
verbose = ('-v', '') in opts
do_timeout = ('-t', '') in opts
conn = mydav(verbose)
lock_test(conn, verbose)
if do_timeout:
timeout_test(conn, verbose)
elif verbose:
print 'Skipping timeout tests.'
proppatch_test(conn, verbose)
copymove_test(conn, verbose)
if __name__ == '__main__':
main()