#
#  actions.py:  routines that actually run the svn client.
#
#  Subversion is a tool for revision control.
#  See http://subversion.tigris.org for more information.
#
# ====================================================================
# Copyright (c) 2000-2007 CollabNet.  All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.  The terms
# are also available at http://subversion.tigris.org/license-1.html.
# If newer versions of this license are posted there, you may use a
# newer version instead, at your option.
#
######################################################################

import os, shutil, re, sys, errno
import difflib, pprint
import xml.parsers.expat

import main, verify, tree, wc  # general svntest routines in this module.
from svntest import Failure

def no_sleep_for_timestamps():
  os.environ['SVN_SLEEP_FOR_TIMESTAMPS'] = 'no'

def do_sleep_for_timestamps():
  os.environ['SVN_SLEEP_FOR_TIMESTAMPS'] = 'yes'

def setup_pristine_repository():
  """Create the pristine repository and 'svn import' the greek tree"""

  # these directories don't exist out of the box, so we may have to create them
  if not os.path.exists(main.general_wc_dir):
    os.makedirs(main.general_wc_dir)

  if not os.path.exists(main.general_repo_dir):
    os.makedirs(main.general_repo_dir) # this also creates all the intermediate dirs

  # If there's no pristine repos, create one.
  if not os.path.exists(main.pristine_dir):
    main.create_repos(main.pristine_dir)

    # if this is dav, gives us access rights to import the greek tree.
    if main.is_ra_type_dav():
      authz_file = os.path.join(main.work_dir, "authz")
      main.file_write(authz_file, "[/]\n* = rw\n")

    # dump the greek tree to disk.
    main.greek_state.write_to_disk(main.greek_dump_dir)

    # import the greek tree, using l:foo/p:bar
    ### todo: svn should not be prompting for auth info when using
    ### repositories with no auth/auth requirements
    output, errput = main.run_svn(None, 'import',
                                  '--username', main.wc_author,
                                  '--password', main.wc_passwd,
                                  '-m', 'Log message for revision 1.',
                                  main.greek_dump_dir, main.pristine_url)

    # check for any errors from the import
    if len(errput):
      display_lines("Errors during initial 'svn import':",
                    'STDERR', None, errput)
      sys.exit(1)

    # verify the printed output of 'svn import'.
    lastline = output.pop().strip()
    cm = re.compile ("(Committed|Imported) revision [0-9]+.")
    match = cm.search (lastline)
    if not match:
      print "ERROR:  import did not succeed, while creating greek repos."
      print "The final line from 'svn import' was:"
      print lastline
      sys.exit(1)
    output_tree = tree.build_tree_from_commit(output)

    ### due to path normalization in the .old_tree() method, we cannot
    ### prepend the necessary '.' directory. thus, let's construct an old
    ### tree manually from the greek_state.
    output_list = []
    for greek_path in main.greek_state.desc.keys():
      output_list.append([ os.path.join(main.greek_dump_dir, greek_path),
                           None, {}, {'verb' : 'Adding'}])
    expected_output_tree = tree.build_generic_tree(output_list)

    try:
      tree.compare_trees(output_tree, expected_output_tree)
    except tree.SVNTreeUnequal:
      verify.display_trees("ERROR:  output of import command is unexpected.",
                           "OUTPUT TREE", expected_output_tree, output_tree)
      sys.exit(1)


######################################################################
# Used by every test, so that they can run independently of  one
# another. Every time this routine is called, it recursively copies
# the `pristine repos' to a new location.
# Note: make sure setup_pristine_repository was called once before
# using this function.

def guarantee_greek_repository(path):
  """Guarantee that a local svn repository exists at PATH, containing
  nothing but the greek-tree at revision 1."""

  if path == main.pristine_dir:
    print "ERROR:  attempt to overwrite the pristine repos!  Aborting."
    sys.exit(1)

  # copy the pristine repository to PATH.
  main.safe_rmtree(path)
  if main.copy_repos(main.pristine_dir, path, 1):
    print "ERROR:  copying repository failed."
    sys.exit(1)

  # make the repos world-writeable, for mod_dav_svn's sake.
  main.chmod_tree(path, 0666, 0666)


def run_and_verify_svnversion(message, wc_dir, repo_url,
                              expected_stdout, expected_stderr):
  "Run svnversion command and check its output"
  out, err = main.run_svnversion(wc_dir, repo_url)
  verify.verify_outputs("Unexpected output", out, err,
                        expected_stdout, expected_stderr)
  return out, err


def run_and_verify_svn(message, expected_stdout, expected_stderr, *varargs):
  """Invokes main.run_svn() with *VARARGS, return stdout and stderr as
  lists of lines.  For both EXPECTED_STDOUT and EXPECTED_STDERR,
  create an appropriate instance of verify.ExpectedOutput (if necessary):

     - If it is an array of strings, create a vanilla ExpectedOutput.

     - If it is a single string, create a RegexOutput.

     - If it is already an instance of ExpectedOutput
       (e.g. UnorderedOutput), leave it alone.

  ...and invoke compare_and_display_lines() on MESSAGE, a label based
  on the name of the stream being compared (e.g. STDOUT), the
  ExpectedOutput instance, and the actual output.

  If EXPECTED_STDOUT is None, do not check stdout.
  EXPECTED_STDERR may not be None.

  If a comparison fails, a Failure will be raised."""

  if expected_stderr is None:
    raise verify.SVNIncorrectDatatype("expected_stderr must not be None")

  want_err = None
  if expected_stderr is not None and expected_stderr is not []:
    want_err = True

  out, err = main.run_svn(want_err, *varargs)
  verify.verify_outputs(message, out, err, expected_stdout, expected_stderr)
  return out, err


def run_and_verify_load(repo_dir, dump_file_content):
  "Runs 'svnadmin load' and reports any errors."
  expected_stderr = []
  output, errput = \
          main.run_command_stdin(main.svnadmin_binary,
    expected_stderr, 1, dump_file_content,
    'load', '--force-uuid', '--quiet', repo_dir)
  verify.verify_outputs("Unexpected stderr output", None, errput,
                        None, expected_stderr)


def run_and_verify_dump(repo_dir):
  "Runs 'svnadmin dump' and reports any errors, returning the dump content."
  output, errput = main.run_svnadmin('dump', repo_dir)
  verify.verify_outputs("Missing expected output(s)", output, errput,
                        verify.AnyOutput, verify.AnyOutput)
  return output


def load_repo(sbox, dumpfile_path = None, dump_str = None):
  "Loads the dumpfile into sbox"
  if not dump_str:
    dump_str = main.file_read(dumpfile_path, "rb")

  # Create a virgin repos and working copy
  main.safe_rmtree(sbox.repo_dir, 1)
  main.safe_rmtree(sbox.wc_dir, 1)
  main.create_repos(sbox.repo_dir)

  # Load the mergetracking dumpfile into the repos, and check it out the repo
  run_and_verify_load(sbox.repo_dir, dump_str)
  run_and_verify_svn(None, None, [], "co", sbox.repo_url, sbox.wc_dir)

  return dump_str


######################################################################
# Subversion Actions
#
# These are all routines that invoke 'svn' in particular ways, and
# then verify the results by comparing expected trees with actual
# trees.
#
# For all the functions below, the OUTPUT_TREE and DISK_TREE args need
# to be created by feeding carefully constructed lists to
# tree.build_generic_tree().  A STATUS_TREE can be built by
# hand, or by editing the tree returned by get_virginal_state().


def run_and_verify_checkout(URL, wc_dir_name, output_tree, disk_tree,
                            singleton_handler_a = None,
                            a_baton = None,
                            singleton_handler_b = None,
                            b_baton = None,
                            *args):
  """Checkout the URL into a new directory WC_DIR_NAME. *ARGS are any
  extra optional args to the checkout subcommand.

  The subcommand output will be verified against OUTPUT_TREE,
  and the working copy itself will be verified against DISK_TREE.
  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will be passed to
  tree.compare_trees - see that function's doc string for more details.
  Returns if successful and raise on failure.

  WC_DIR_NAME is deleted if present unless the '--force' option is passed
  in *ARGS."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()
  if isinstance(disk_tree, wc.State):
    disk_tree = disk_tree.old_tree()

  # Remove dir if it's already there, unless this is a forced checkout.
  # In that case assume we want to test a forced checkout's toleration
  # of obstructing paths.
  remove_wc = True
  for arg in args:
    if arg == '--force':
      remove_wc = False
      break
  if remove_wc:
    main.safe_rmtree(wc_dir_name)

  # Checkout and make a tree of the output, using l:foo/p:bar
  ### todo: svn should not be prompting for auth info when using
  ### repositories with no auth/auth requirements
  output, errput = main.run_svn (None, 'co',
                                 '--username', main.wc_author,
                                 '--password', main.wc_passwd,
                                 URL, wc_dir_name, *args)
  actual = tree.build_tree_from_checkout (output)

  # Verify actual output against expected output.
  tree.compare_trees (actual, output_tree)

  # Create a tree by scanning the working copy
  actual = tree.build_tree_from_wc (wc_dir_name)

  # Verify expected disk against actual disk.
  tree.compare_trees (actual, disk_tree,
                      singleton_handler_a, a_baton,
                      singleton_handler_b, b_baton)


def run_and_verify_export(URL, export_dir_name, output_tree, disk_tree,
                          singleton_handler_a = None,
                          a_baton = None,
                          singleton_handler_b = None,
                          b_baton = None,
                          *args):
  """Export the URL into a new directory WC_DIR_NAME.

  The subcommand output will be verified against OUTPUT_TREE,
  and the exported copy itself will be verified against DISK_TREE.
  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will be passed to
  tree.compare_trees - see that function's doc string for more details.
  Returns if successful and raise on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()
  if isinstance(disk_tree, wc.State):
    disk_tree = disk_tree.old_tree()

  # Export and make a tree of the output, using l:foo/p:bar
  ### todo: svn should not be prompting for auth info when using
  ### repositories with no auth/auth requirements
  output, errput = main.run_svn (None, 'export',
                                 '--username', main.wc_author,
                                 '--password', main.wc_passwd,
                                 URL, export_dir_name, *args)
  actual = tree.build_tree_from_checkout (output)

  # Verify actual output against expected output.
  tree.compare_trees (actual, output_tree)

  # Create a tree by scanning the working copy.  Don't ignore
  # the .svn directories so that we generate an error if they
  # happen to show up.
  actual = tree.build_tree_from_wc (export_dir_name, ignore_svn=False)

  # Verify expected disk against actual disk.
  tree.compare_trees (actual, disk_tree,
                      singleton_handler_a, a_baton,
                      singleton_handler_b, b_baton)


# run_and_verify_log_xml

class LogEntry:
  def __init__(self, revision, changed_paths=None, revprops=None):
    self.revision = revision
    if changed_paths == None:
      self.changed_paths = {}
    else:
      self.changed_paths = changed_paths
    if revprops == None:
      self.revprops = {}
    else:
      self.revprops = revprops

  def assert_changed_paths(self, changed_paths):
    """Not implemented, so just raises svntest.Failure.
    """
    raise Failure('NOT IMPLEMENTED')

  def assert_revprops(self, revprops):
    """Assert that the dict revprops is the same as this entry's revprops.

    Raises svntest.Failure if not.
    """
    if self.revprops != revprops:
      raise Failure('\n' + '\n'.join(difflib.ndiff(
            pprint.pformat(revprops).splitlines(),
            pprint.pformat(self.revprops).splitlines())))

class LogParser:
  def parse(self, data):
    """Return a list of LogEntrys parsed from the sequence of strings data.

    This is the only method of interest to callers.
    """
    try:
      for i in data:
        self.parser.Parse(i)
      self.parser.Parse('', True)
    except xml.parsers.expat.ExpatError, e:
      raise verify.SVNUnexpectedStdout('%s\n%s\n' % (e, ''.join(data),))
    return self.entries

  def __init__(self):
    # for expat
    self.parser = xml.parsers.expat.ParserCreate()
    self.parser.StartElementHandler = self.handle_start_element
    self.parser.EndElementHandler = self.handle_end_element
    self.parser.CharacterDataHandler = self.handle_character_data
    # Ignore some things.
    self.ignore_elements('log', 'paths', 'path', 'revprops')
    self.ignore_tags('logentry_end', 'author_start', 'date_start', 'msg_start')
    # internal state
    self.cdata = []
    self.property = None
    # the result
    self.entries = []

  def ignore(self, *args, **kwargs):
    del self.cdata[:]
  def ignore_tags(self, *args):
    for tag in args:
      setattr(self, tag, self.ignore)
  def ignore_elements(self, *args):
    for element in args:
      self.ignore_tags(element + '_start', element + '_end')

  # expat handlers
  def handle_start_element(self, name, attrs):
    getattr(self, name + '_start')(attrs)
  def handle_end_element(self, name):
    getattr(self, name + '_end')()
  def handle_character_data(self, data):
    self.cdata.append(data)

  # element handler utilities
  def use_cdata(self):
    result = ''.join(self.cdata).strip()
    del self.cdata[:]
    return result
  def svn_prop(self, name):
    self.entries[-1].revprops['svn:' + name] = self.use_cdata()

  # element handlers
  def logentry_start(self, attrs):
    self.entries.append(LogEntry(int(attrs['revision'])))
  def author_end(self):
    self.svn_prop('author')
  def msg_end(self):
    self.svn_prop('log')
  def date_end(self):
    # svn:date could be anything, so just note its presence.
    self.cdata[:] = ['']
    self.svn_prop('date')
  def property_start(self, attrs):
    self.property = attrs['name']
  def property_end(self):
    self.entries[-1].revprops[self.property] = self.use_cdata()

def run_and_verify_log_xml(message=None, expected_paths=None,
                           expected_revprops=None, expected_stdout=None,
                           expected_stderr=None, args=[]):
  """Call run_and_verify_svn with log --xml and args (optional) as command
  arguments, and pass along message, expected_stdout, and expected_stderr.

  If message is None, pass the svn log command as message.

  expected_paths checking is not yet implemented.

  expected_revprops is an optional list of dicts, compared to each
  revision's revprops.  The list must be in the same order the log entries
  come in.  Any svn:date revprops in the dicts must be '' in order to
  match, as the actual dates could be anything.

  expected_paths and expected_revprops are ignored if expected_stdout or
  expected_stderr is specified.
  """
  if message == None:
    message = ' '.join(args)

  # We'll parse the output unless the caller specifies expected_stderr or
  # expected_stdout for run_and_verify_svn.
  parse = True
  if expected_stderr == None:
    expected_stderr = []
  else:
    parse = False
  if expected_stdout != None:
    parse = False

  log_args = list(args)
  if expected_paths != None:
    log_args.append('-v')

  (stdout, stderr) = run_and_verify_svn(
    message, expected_stdout, expected_stderr,
    'log', '--xml', *log_args)
  if not parse:
    return

  for (index, entry) in enumerate(LogParser().parse(stdout)):
    if expected_revprops != None:
      entry.assert_revprops(expected_revprops[index])
    if expected_paths != None:
      entry.assert_changed_paths(expected_paths[index])


def verify_update(actual_output, wc_dir_name,
                  output_tree, disk_tree, status_tree,
                  singleton_handler_a, a_baton,
                  singleton_handler_b, b_baton,
                  check_props):
  """Verify update of WC_DIR_NAME.

  The subcommand output (found in ACTUAL_OUTPUT) will be verified
  against OUTPUT_TREE, and the working copy itself will be verified
  against DISK_TREE.  If optional STATUS_TREE is given, then
  'svn status' output will be compared.  (This is a good way to check
  that revision numbers were bumped.)  SINGLETON_HANDLER_A and
  SINGLETON_HANDLER_B will be passed to tree.compare_trees - see that
  function's doc string for more details.  If CHECK_PROPS is set, then
  disk comparison will examine props.  Returns if successful, raises
  on failure."""

  # Verify actual output against expected output.
  tree.compare_trees (actual_output, output_tree)

  # Create a tree by scanning the working copy
  actual_disk = tree.build_tree_from_wc (wc_dir_name, check_props)

  # Verify expected disk against actual disk.
  tree.compare_trees (actual_disk, disk_tree,
                      singleton_handler_a, a_baton,
                      singleton_handler_b, b_baton)

  # Verify via 'status' command too, if possible.
  if status_tree:
    run_and_verify_status(wc_dir_name, status_tree)


def run_and_verify_update(wc_dir_name,
                          output_tree, disk_tree, status_tree,
                          error_re_string = None,
                          singleton_handler_a = None,
                          a_baton = None,
                          singleton_handler_b = None,
                          b_baton = None,
                          check_props = False,
                          *args):

  """Update WC_DIR_NAME.  *ARGS are any extra optional args to the
  update subcommand.  NOTE: If *ARGS is specified at all, explicit
  target paths must be passed in *ARGS as well (or a default `.' will
  be chosen by the 'svn' binary).  This allows the caller to update
  many items in a single working copy dir, but still verify the entire
  working copy dir.

  If ERROR_RE_STRING, the update must exit with error, and the error
  message must match regular expression ERROR_RE_STRING.

  Else if ERROR_RE_STRING is None, then:

  The subcommand output will be verified against OUTPUT_TREE, and the
  working copy itself will be verified against DISK_TREE.  If optional
  STATUS_TREE is given, then 'svn status' output will be compared.
  (This is a good way to check that revision numbers were bumped.)
  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will be passed to
  tree.compare_trees - see that function's doc string for more
  details.

  If CHECK_PROPS is set, then disk comparison will examine props.
  Returns if successful, raises on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()
  if isinstance(disk_tree, wc.State):
    disk_tree = disk_tree.old_tree()
  if isinstance(status_tree, wc.State):
    status_tree = status_tree.old_tree()

  # Update and make a tree of the output.
  if len(args):
    output, errput = main.run_svn (error_re_string, 'up', *args)
  else:
    output, errput = main.run_svn (error_re_string, 'up', wc_dir_name, *args)

  if (error_re_string):
    rm = re.compile(error_re_string)
    for line in errput:
      match = rm.search(line)
      if match:
        return
    raise main.SVNUnmatchedError

  actual = tree.build_tree_from_checkout (output)
  verify_update (actual, wc_dir_name,
                 output_tree, disk_tree, status_tree,
                 singleton_handler_a, a_baton,
                 singleton_handler_b, b_baton,
                 check_props)


def run_and_verify_merge(dir, rev1, rev2, url,
                         output_tree, disk_tree, status_tree, skip_tree,
                         error_re_string = None,
                         singleton_handler_a = None,
                         a_baton = None,
                         singleton_handler_b = None,
                         b_baton = None,
                         check_props = False,
                         dry_run = True,
                         *args):
  """Run 'svn merge -rREV1:REV2 URL DIR', leaving off the '-r'
  argument if both REV1 and REV2 are None."""
  if args:
    run_and_verify_merge2(dir, rev1, rev2, url, None, output_tree, disk_tree,
                          status_tree, skip_tree, error_re_string,
                          singleton_handler_a, a_baton, singleton_handler_b,
                          b_baton, check_props, dry_run, *args)
  else:
    run_and_verify_merge2(dir, rev1, rev2, url, None, output_tree, disk_tree,
                          status_tree, skip_tree, error_re_string,
                          singleton_handler_a, a_baton, singleton_handler_b,
                          b_baton, check_props, dry_run)


def run_and_verify_merge2(dir, rev1, rev2, url1, url2,
                          output_tree, disk_tree, status_tree, skip_tree,
                          error_re_string = None,
                          singleton_handler_a = None,
                          a_baton = None,
                          singleton_handler_b = None,
                          b_baton = None,
                          check_props = False,
                          dry_run = True,
                          *args):
  """Run 'svn merge URL1@REV1 URL2@REV2 DIR' if URL2 is not None
  (for a three-way merge between URLs and WC).

  If URL2 is None, run 'svn merge -rREV1:REV2 URL1 DIR'.  If both REV1
  and REV2 are None, leave off the '-r' argument.

  If ERROR_RE_STRING, the merge must exit with error, and the error
  message must match regular expression ERROR_RE_STRING.

  Else if ERROR_RE_STRING is None, then:

  The subcommand output will be verified against OUTPUT_TREE, and the
  working copy itself will be verified against DISK_TREE.  If optional
  STATUS_TREE is given, then 'svn status' output will be compared.
  The 'skipped' merge output will be compared to SKIP_TREE.
  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will be passed to
  tree.compare_trees - see that function's doc string for more
  details.

  If CHECK_PROPS is set, then disk comparison will examine props.

  If DRY_RUN is set then a --dry-run merge will be carried out first and
  the output compared with that of the full merge.

  Returns if successful, raises on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()
  if isinstance(disk_tree, wc.State):
    disk_tree = disk_tree.old_tree()
  if isinstance(status_tree, wc.State):
    status_tree = status_tree.old_tree()
  if isinstance(skip_tree, wc.State):
    skip_tree = skip_tree.old_tree()

  merge_command = [ "merge" ]
  if url2:
    merge_command.extend((url1 + "@" + str(rev1), url2 + "@" + str(rev2)))
  else:
    if not (rev1 is None and rev2 is None):
      merge_command.append("-r" + str(rev1) + ":" + str(rev2))
    merge_command.append(url1)
  merge_command.append(dir)
  merge_command = tuple(merge_command)

  if dry_run:
    pre_disk = tree.build_tree_from_wc(dir)
    dry_run_command = merge_command + ('--dry-run',)
    dry_run_command = dry_run_command + args
    out_dry, err_dry = main.run_svn(error_re_string, *dry_run_command)
    post_disk = tree.build_tree_from_wc(dir)
    try:
      tree.compare_trees(post_disk, pre_disk)
    except tree.SVNTreeError:
      print "============================================================="
      print "Dry-run merge altered working copy"
      print "============================================================="
      raise


  # Update and make a tree of the output.
  merge_command = merge_command + args
  out, err = main.run_svn (error_re_string, *merge_command)

  if error_re_string:
    if not error_re_string.startswith(".*"):
      error_re_string = ".*(" + error_re_string + ")"
    expected_err = verify.RegexOutput(error_re_string, match_all=False)
    verify.verify_outputs(None, None, err, None, expected_err)
    return
  elif err:
    raise verify.SVNUnexpectedStderr(err)

  if dry_run and out != out_dry:
    print "============================================================="
    print "Merge outputs differ"
    print "The dry-run merge output:"
    map(sys.stdout.write, out_dry)
    print "The full merge output:"
    map(sys.stdout.write, out)
    print "============================================================="
    raise main.SVNUnmatchedError

  def missing_skip(a, b):
    print "============================================================="
    print "Merge failed to skip: " + a.path
    print "============================================================="
    raise Failure
  def extra_skip(a, b):
    print "============================================================="
    print "Merge unexpectedly skipped: " + a.path
    print "============================================================="
    raise Failure

  myskiptree = tree.build_tree_from_skipped(out)
  tree.compare_trees(myskiptree, skip_tree,
                     extra_skip, None, missing_skip, None)

  actual = tree.build_tree_from_checkout(out, 0)
  verify_update (actual, dir,
                 output_tree, disk_tree, status_tree,
                 singleton_handler_a, a_baton,
                 singleton_handler_b, b_baton,
                 check_props)


def run_and_verify_switch(wc_dir_name,
                          wc_target,
                          switch_url,
                          output_tree, disk_tree, status_tree,
                          error_re_string = None,
                          singleton_handler_a = None,
                          a_baton = None,
                          singleton_handler_b = None,
                          b_baton = None,
                          check_props = False,
                          *args):

  """Switch WC_TARGET (in working copy dir WC_DIR_NAME) to SWITCH_URL.

  If ERROR_RE_STRING, the switch must exit with error, and the error
  message must match regular expression ERROR_RE_STRING.

  Else if ERROR_RE_STRING is None, then:

  The subcommand output will be verified against OUTPUT_TREE, and the
  working copy itself will be verified against DISK_TREE.  If optional
  STATUS_TREE is given, then 'svn status' output will be
  compared.  (This is a good way to check that revision numbers were
  bumped.)  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will be passed to
  tree.compare_trees - see that function's doc string for more details.
  If CHECK_PROPS is set, then disk comparison will examine props.
  Returns if successful, raises on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()
  if isinstance(disk_tree, wc.State):
    disk_tree = disk_tree.old_tree()
  if isinstance(status_tree, wc.State):
    status_tree = status_tree.old_tree()

  # Update and make a tree of the output.
  output, errput = main.run_svn (error_re_string, 'switch',
                                 '--username', main.wc_author,
                                 '--password', main.wc_passwd,
                                 switch_url, wc_target, *args)

  if error_re_string:
    if not error_re_string.startswith(".*"):
      error_re_string = ".*(" + error_re_string + ")"
    expected_err = verify.RegexOutput(error_re_string, match_all=False)
    verify.verify_outputs(None, None, errput, None, expected_err)
    return
  elif errput:
    raise verify.SVNUnexpectedStderr(err)

  actual = tree.build_tree_from_checkout (output)

  verify_update (actual, wc_dir_name,
                 output_tree, disk_tree, status_tree,
                 singleton_handler_a, a_baton,
                 singleton_handler_b, b_baton,
                 check_props)


def run_and_verify_commit(wc_dir_name, output_tree, status_tree,
                          error_re_string = None,
                          singleton_handler_a = None,
                          a_baton = None,
                          singleton_handler_b = None,
                          b_baton = None,
                          *args):
  """Commit and verify results within working copy WC_DIR_NAME,
  sending ARGS to the commit subcommand.

  The subcommand output will be verified against OUTPUT_TREE.  If
  optional STATUS_TREE is given, then 'svn status' output will
  be compared.  (This is a good way to check that revision numbers
  were bumped.)

  If ERROR_RE_STRING is None, the commit must not exit with error.  If
  ERROR_RE_STRING is a string, the commit must exit with error, and
  the error message must match regular expression ERROR_RE_STRING.

  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will be passed to
  tree.compare_trees - see that function's doc string for more
  details.  Returns if successful, raises on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()
  if isinstance(status_tree, wc.State):
    status_tree = status_tree.old_tree()

  # Commit.
  output, errput = main.run_svn(error_re_string, 'ci',
                                '--username', main.wc_author,
                                '--password', main.wc_passwd,
                                '-m', 'log msg',
                                *args)

  if error_re_string:
    if not error_re_string.startswith(".*"):
      error_re_string = ".*(" + error_re_string + ")"
    expected_err = verify.RegexOutput(error_re_string, match_all=False)
    verify.verify_outputs(None, None, errput, None, expected_err)
    return

  # Else not expecting error:

  # Remove the final output line, and verify that the commit succeeded.
  lastline = ""
  if len(output):
    lastline = output.pop().strip()

    cm = re.compile("(Committed|Imported) revision [0-9]+.")
    match = cm.search(lastline)
    if not match:
      print "ERROR:  commit did not succeed."
      print "The final line from 'svn ci' was:"
      print lastline
      raise main.SVNCommitFailure

  # The new 'final' line in the output is either a regular line that
  # mentions {Adding, Deleting, Sending, ...}, or it could be a line
  # that says "Transmitting file data ...".  If the latter case, we
  # want to remove the line from the output; it should be ignored when
  # building a tree.
  if len(output):
    lastline = output.pop()

    tm = re.compile("Transmitting file data.+")
    match = tm.search(lastline)
    if not match:
      # whoops, it was important output, put it back.
      output.append(lastline)

  # Convert the output into a tree.
  actual = tree.build_tree_from_commit (output)

  # Verify actual output against expected output.
  try:
    tree.compare_trees (actual, output_tree)
  except tree.SVNTreeError:
      verify.display_trees("Output of commit is unexpected",
                           "OUTPUT TREE", output_tree, actual)
      raise

  # Verify via 'status' command too, if possible.
  if status_tree:
    run_and_verify_status(wc_dir_name, status_tree)


# This function always passes '-q' to the status command, which
# suppresses the printing of any unversioned or nonexistent items.
def run_and_verify_status(wc_dir_name, output_tree,
                          singleton_handler_a = None,
                          a_baton = None,
                          singleton_handler_b = None,
                          b_baton = None):
  """Run 'status' on WC_DIR_NAME and compare it with the
  expected OUTPUT_TREE.  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will
  be passed to tree.compare_trees - see that function's doc string for
  more details.
  Returns on success, raises on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()

  output, errput = main.run_svn (None, 'status', '-v', '-u', '-q',
                                 '--username', main.wc_author,
                                 '--password', main.wc_passwd,
                                 wc_dir_name)

  actual = tree.build_tree_from_status (output)

  # Verify actual output against expected output.
  try:
    tree.compare_trees (actual, output_tree,
                        singleton_handler_a, a_baton,
                        singleton_handler_b, b_baton)
  except tree.SVNTreeError:
    verify.display_trees(None, 'STATUS OUTPUT TREE', output_tree, actual)
    raise


# A variant of previous func, but doesn't pass '-q'.  This allows us
# to verify unversioned or nonexistent items in the list.
def run_and_verify_unquiet_status(wc_dir_name, output_tree,
                                  singleton_handler_a = None,
                                  a_baton = None,
                                  singleton_handler_b = None,
                                  b_baton = None):
  """Run 'status' on WC_DIR_NAME and compare it with the
  expected OUTPUT_TREE.  SINGLETON_HANDLER_A and SINGLETON_HANDLER_B will
  be passed to tree.compare_trees - see that function's doc string for
  more details.
  Returns on success, raises on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()

  output, errput = main.run_svn (None, 'status', '-v', '-u', wc_dir_name)

  actual = tree.build_tree_from_status (output)

  # Verify actual output against expected output.
  if (singleton_handler_a or singleton_handler_b):
    tree.compare_trees (actual, output_tree,
                        singleton_handler_a, a_baton,
                        singleton_handler_b, b_baton)
  else:
    tree.compare_trees (actual, output_tree)


def run_and_verify_diff_summarize(output_tree, error_re_string = None,
                                  singleton_handler_a = None,
                                  a_baton = None,
                                  singleton_handler_b = None,
                                  b_baton = None,
                                  *args):
  """Run 'diff --summarize' with the arguments *ARGS.
  If ERROR_RE_STRING, the command must exit with error, and the error
  message must match regular expression ERROR_RE_STRING.

  Else if ERROR_RE_STRING is None, the subcommand output will be
  verified against OUTPUT_TREE.  SINGLETON_HANDLER_A and
  SINGLETON_HANDLER_B will be passed to tree.compare_trees - see that
  function's doc string for more details.  Returns on success, raises
  on failure."""

  if isinstance(output_tree, wc.State):
    output_tree = output_tree.old_tree()

  output, errput = main.run_svn (None, 'diff', '--summarize',
                                 '--username', main.wc_author,
                                 '--password', main.wc_passwd,
                                 *args)

  if error_re_string:
    if not error_re_string.startswith(".*"):
      error_re_string = ".*(" + error_re_string + ")"
    expected_err = verify.RegexOutput(error_re_string, match_all=False)
    verify.verify_outputs(None, None, errput, None, expected_err)
    return

  actual = tree.build_tree_from_diff_summarize (output)

  # Verify actual output against expected output.
  try:
    tree.compare_trees (actual, output_tree,
                        singleton_handler_a, a_baton,
                        singleton_handler_b, b_baton)
  except tree.SVNTreeError:
    verify.display_trees(None, 'DIFF OUTPUT TREE', output_tree, actual)
    raise

def run_and_validate_lock(path, username, password):
  """`svn lock' the given path and validate the contents of the lock.
     Use the given username. This is important because locks are
     user specific."""

  comment = "Locking path:%s." % path

  # lock the path
  run_and_verify_svn(None, ".*locked by user", [], 'lock',
                     '--username', username,
                     '--password', password,
                     '-m', comment, path)

  # Run info and check that we get the lock fields.
  output, err = run_and_verify_svn(None, None, [],
                                   'info','-R',
                                   path)

  ### TODO: Leverage RegexOuput([...], match_all=True) here.
  # prepare the regexs to compare against
  token_re = re.compile (".*?Lock Token: opaquelocktoken:.*?", re.DOTALL)
  author_re = re.compile (".*?Lock Owner: %s\n.*?" % username, re.DOTALL)
  created_re = re.compile (".*?Lock Created:.*?", re.DOTALL)
  comment_re = re.compile (".*?%s\n.*?" % re.escape(comment), re.DOTALL)
  # join all output lines into one
  output = "".join(output)
  # Fail even if one regex does not match
  if ( not (token_re.match(output) and \
            author_re.match(output) and \
            created_re.match(output) and \
            comment_re.match(output))):
    raise Failure

######################################################################
# Other general utilities


# This allows a test to *quickly* bootstrap itself.
def make_repo_and_wc(sbox, create_wc = True):
  """Create a fresh repository and checkout a wc from it.

  The repo and wc directories will both be named TEST_NAME, and
  repsectively live within the global dirs 'general_repo_dir' and
  'general_wc_dir' (variables defined at the top of this test
  suite.)  Returns on success, raises on failure."""

  # Create (or copy afresh) a new repos with a greek tree in it.
  guarantee_greek_repository(sbox.repo_dir)

  if create_wc:
    # Generate the expected output tree.
    expected_output = main.greek_state.copy()
    expected_output.wc_dir = sbox.wc_dir
    expected_output.tweak(status='A ', contents=None)

    # Generate an expected wc tree.
    expected_wc = main.greek_state

    # Do a checkout, and verify the resulting output and disk contents.
    run_and_verify_checkout(sbox.repo_url,
                            sbox.wc_dir,
                            expected_output,
                            expected_wc)
  else:
    # just make sure the parent folder of our working copy is created
    try:
      os.mkdir(main.general_wc_dir)
    except OSError, err:
      if err.errno != errno.EEXIST:
        raise

# Duplicate a working copy or other dir.
def duplicate_dir(wc_name, wc_copy_name):
  """Copy the working copy WC_NAME to WC_COPY_NAME.  Overwrite any
  existing tree at that location."""

  main.safe_rmtree(wc_copy_name)
  shutil.copytree(wc_name, wc_copy_name)



def get_virginal_state(wc_dir, rev):
  "Return a virginal greek tree state for a WC and repos at revision REV."

  rev = str(rev) ### maybe switch rev to an integer?

  # copy the greek tree, shift it to the new wc_dir, insert a root elem,
  # then tweak all values
  state = main.greek_state.copy()
  state.wc_dir = wc_dir
  state.desc[''] = wc.StateItem()
  state.tweak(contents=None, status='  ', wc_rev=rev)

  return state

def remove_admin_tmp_dir(wc_dir):
  "Remove the tmp directory within the administrative directory."

  tmp_path = os.path.join(wc_dir, main.get_admin_name(), 'tmp')
  ### Any reason not to use main.safe_rmtree()?
  os.rmdir(os.path.join(tmp_path, 'prop-base'))
  os.rmdir(os.path.join(tmp_path, 'props'))
  os.rmdir(os.path.join(tmp_path, 'text-base'))
  os.rmdir(tmp_path)

# Cheap administrative directory locking
def lock_admin_dir(wc_dir):
  "Lock a SVN administrative directory"

  path = os.path.join(wc_dir, main.get_admin_name(), 'lock')
  main.file_append(path, "stop looking!")

def enable_revprop_changes(repo_dir):
  """Enable revprop changes in a repository REPOS_DIR by creating a
pre-revprop-change hook script and (if appropriate) making it executable."""

  hook_path = main.get_pre_revprop_change_hook_path (repo_dir)
  main.create_python_hook_script (hook_path, 'import sys; sys.exit(0)')

def disable_revprop_changes(repo_dir, message):
  """Disable revprop changes in a repository REPO_DIR by creating a
pre-revprop-change hook script like enable_revprop_changes, except that
the hook prints MESSAGE to stderr and exits non-zero.  MESSAGE is printed
very simply, and should have no newlines or quotes."""

  hook_path = main.get_pre_revprop_change_hook_path (repo_dir)
  main.create_python_hook_script (hook_path,
                                  'import sys\n'
                                  'sys.stderr.write("%s")\n'
                                  'sys.exit(1)\n' % (message,))

def create_failing_post_commit_hook(repo_dir):
  """Disable commits in a repository REPOS_DIR by creating a post-commit hook
script which always reports errors."""

  hook_path = main.get_post_commit_hook_path (repo_dir)
  main.create_python_hook_script (hook_path, 'import sys; '
    'sys.stderr.write("Post-commit hook failed"); '
    'sys.exit(1)')

# set_prop can be used for binary properties are values like '*' which are not
# handled correctly when specified on the command line.
def set_prop(expected_err, name, value, path, valp):
  """Set a property with value from a file"""
  valf = open(valp, 'wb')
  valf.seek(0)
  valf.truncate(0)
  valf.write(value)
  valf.flush()
  main.run_svn(expected_err, 'propset', '-F', valp, name, path)

def check_prop(name, path, exp_out):
  """Verify that property NAME on PATH has a value of EXP_OUT"""
  # Not using run_svn because binary_mode must be set
  out, err = main.run_command(main.svn_binary, None, 1, 'pg', '--strict',
                              name, path, '--config-dir',
                              main.default_config_dir)
  if out != exp_out:
    print "svn pg --strict", name, "output does not match expected."
    print "Expected standard output: ", exp_out, "\n"
    print "Actual standard output: ", out, "\n"
    raise Failure

def fill_file_with_lines(wc_path, line_nbr, line_descrip=None,
                         append=True):
  """Change the file at WC_PATH (adding some lines), and return its
  new contents.  LINE_NBR indicates the line number at which the new
  contents should assume that it's being appended.  LINE_DESCRIP is
  something like 'This is line' (the default) or 'Conflicting line'."""

  if line_descrip is None:
    line_descrip = "This is line"

  # Generate the new contents for the file.
  contents = ""
  for n in range(line_nbr, line_nbr + 3):
    contents = contents + line_descrip + " " + `n` + " in '" + \
               os.path.basename(wc_path) + "'.\n"

  # Write the new contents to the file.
  if append:
    main.file_append(wc_path, contents)
  else:
    main.file_write(wc_path, contents)

  return contents

def inject_conflict_into_wc(sbox, state_path, file_path,
                            expected_disk, expected_status, merged_rev):
  """Create a conflict at FILE_PATH by replacing its contents,
  committing the change, backdating it to its previous revision,
  changing its contents again, then updating it to merge in the
  previous change."""

  wc_dir = sbox.wc_dir

  # Make a change to the file.
  contents = fill_file_with_lines(file_path, 1, "This is line", append=False)

  # Commit the changed file, first taking note of the current revision.
  prev_rev = expected_status.desc[state_path].wc_rev
  expected_output = wc.State(wc_dir, {
    state_path : wc.StateItem(verb='Sending'),
    })
  if expected_status:
    expected_status.tweak(state_path, wc_rev=merged_rev)
  run_and_verify_commit(wc_dir, expected_output, expected_status,
                        None, None, None, None, None,
                        file_path)

  # Backdate the file.
  output, errput = main.run_svn(None, "up", "-r", str(prev_rev),
                                "--username", main.wc_author,
                                "--password", main.wc_passwd,
                                 file_path)
  if expected_status:
    expected_status.tweak(state_path, wc_rev=prev_rev)

  # Make a conflicting change to the file, and backdate the file.
  conflicting_contents = fill_file_with_lines(file_path, 1, "Conflicting line",
                                              append=False)

  # Merge the previous change into the file to produce a conflict.
  if expected_disk:
    expected_disk.tweak(state_path, contents="")
  expected_output = wc.State(wc_dir, {
    state_path : wc.StateItem(status='C '),
    })
  inject_conflict_into_expected_state(state_path,
                                      expected_disk, expected_status,
                                      conflicting_contents, contents,
                                      merged_rev)
  output, errput = main.run_svn(None, "up", "-r", str(merged_rev),
                                "--username", main.wc_author,
                                "--password", main.wc_passwd,
                                sbox.repo_url + "/" + state_path, file_path)
  if expected_status:
    expected_status.tweak(state_path, wc_rev=merged_rev)

def inject_conflict_into_expected_state(state_path,
                                        expected_disk, expected_status,
                                        wc_text, merged_text, merged_rev):
  """Update the EXPECTED_DISK and EXPECTED_STATUS trees for the
  conflict at STATE_PATH (ignored if None).  WC_TEXT, MERGED_TEXT, and
  MERGED_REV are used to determine the contents of the conflict (the
  text parameters should be newline-terminated)."""
  if expected_disk:
    conflict_marker = make_conflict_marker_text(wc_text, merged_text,
                                                merged_rev)
    existing_text = expected_disk.desc[state_path].contents or ""
    expected_disk.tweak(state_path, contents=existing_text + conflict_marker)

  if expected_status:
    expected_status.tweak(state_path, status='C ')

def make_conflict_marker_text(wc_text, merged_text, merged_rev):
  """Return the conflict marker text described by WC_TEXT (the current
  text in the working copy, MERGED_TEXT (the conflicting text merged
  in), and MERGED_REV (the revision from whence the conflicting text
  came)."""
  return "<<<<<<< .working\n" + wc_text + "=======\n" + \
         merged_text + ">>>>>>> .merge-right.r" + str(merged_rev) + "\n"