# Copyright (C) 2002-2006, Stefan Schwarzer
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# - Redistributions of source code must retain the above copyright
#   notice, this list of conditions and the following disclaimer.
#
# - Redistributions in binary form must reproduce the above copyright
#   notice, this list of conditions and the following disclaimer in the
#   documentation and/or other materials provided with the distribution.
#
# - Neither the name of the above author nor the names of the
#   contributors to the software may be used to endorse or promote
#   products derived from this software without specific prior written
#   permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# $Id: _test_ftputil.py 689 2007-04-16 01:07:10Z schwa $

import ftplib
import operator
import os
import posixpath
import random
import stat
import time
import unittest

import _mock_ftplib
import _test_base
import ftp_error
import ftp_file
import ftp_stat
import ftputil

#
# helper functions to generate random data
#
def random_data(pool, size=10000):
    """
    Return a sequence of characters consisting of those from
    the pool of integer numbers.
    """
    character_list = []
    for i in range(size):
        ordinal = random.choice(pool)
        character_list.append(chr(ordinal))
    result = ''.join(character_list)
    return result

def ascii_data():
    """Return an ASCII character string."""
    pool = range(32, 128)
    pool.append(ord('\n'))
    return random_data(pool)

def binary_data():
    """Return a binary character string."""
    pool = range(0, 256)
    return random_data(pool)


#
# several customized `MockSession` classes
#
class FailOnLoginSession(_mock_ftplib.MockSession):
    def __init__(self, host='', user='', password=''):
        raise ftplib.error_perm

class ReadMockSession(_mock_ftplib.MockSession):
    mock_file_content = 'line 1\r\nanother line\r\nyet another line'

class AsciiReadMockSession(_mock_ftplib.MockSession):
    mock_file_content = '\r\n'.join(map(str, range(20)))

class BinaryDownloadMockSession(_mock_ftplib.MockSession):
    mock_file_content = binary_data()

class TimeShiftMockSession(_mock_ftplib.MockSession):
    def delete(self, file_name):
        pass

class InaccessibleDirSession(_mock_ftplib.MockSession):
    _login_dir = '/inaccessible'

    def pwd(self):
        return self._login_dir

    def cwd(self, dir):
        if dir in (self._login_dir, self._login_dir + '/'):
            raise ftplib.error_perm
        else:
            _mock_ftplib.MockSession.cwd(self, dir)

#
# customized `FTPHost` class for conditional upload/download tests
#  and time shift tests
#
class FailingUploadAndDownloadFTPHost(ftputil.FTPHost):
    def upload(self, source, target, mode=''):
        assert False, "`FTPHost.upload` should not have been called"

    def download(self, source, target, mode=''):
        assert False, "`FTPHost.download` should not have been called"

class TimeShiftFTPHost(ftputil.FTPHost):
    class _Path:
        def split(self, path):
            return posixpath.split(path)
        def set_mtime(self, mtime):
            self._mtime = mtime
        def getmtime(self, file_name):
            return self._mtime
        def abspath(self, path):
            return "/home/sschwarzer/_ftputil_sync_"
        # needed for `isdir` in `FTPHost.remove`
        def isfile(self, path):
            return True

    def __init__(self, *args, **kwargs):
        ftputil.FTPHost.__init__(self, *args, **kwargs)
        self.path = self._Path()

#
# test cases
#
class TestOpenAndClose(unittest.TestCase):
    """Test opening and closing of `FTPHost` objects."""
    def test_open_and_close(self):
        """Test closing of `FTPHost`."""
        host = _test_base.ftp_host_factory()
        host.close()
        self.assertEqual(host.closed, 1)
        self.assertEqual(host._children, [])


class TestLogin(unittest.TestCase):
    def test_invalid_login(self):
        """Login to invalid host must fail."""
        self.assertRaises(ftp_error.FTPOSError, _test_base.ftp_host_factory,
                          FailOnLoginSession)


class TestSetParser(unittest.TestCase):
    def test_set_parser(self):
        """Test if the selected parser is used."""
        # this test isn't very practical but should help at least a bit ...
        host = _test_base.ftp_host_factory()
        # implicitly fix at Unix format
        files = host.listdir("/home/sschwarzer")
        self.assertEqual(files, ['chemeng', 'download', 'image', 'index.html',
          'os2', 'osup', 'publications', 'python', 'scios2'])
        host.set_parser(ftp_stat.MSParser())
        files = host.listdir("/home/msformat/XPLaunch")
        self.assertEqual(files, ['WindowsXP', 'XPLaunch', 'empty',
          'abcd.exe', 'O2KKeys.exe'])
        self.assertEqual(host._stat._allow_parser_switching, False)


class TestFileOperations(unittest.TestCase):
    """Test operations with file-like objects."""
    def test_inaccessible_dir(self):
        """Test whether opening a file at an invalid location fails."""
        host = _test_base.ftp_host_factory(
               session_factory=InaccessibleDirSession)
        self.assertRaises(ftp_error.FTPIOError, host.file,
                          '/inaccessible/new_file', 'w')

    def test_caching(self):
        """Test whether `_FTPFile` cache of `FTPHost` object works."""
        host = _test_base.ftp_host_factory()
        self.assertEqual(len(host._children), 0)
        path1 = 'path1'
        path2 = 'path2'
        # open one file and inspect cache
        file1 = host.file(path1, 'w')
        child1 = host._children[0]
        self.assertEqual(len(host._children), 1)
        self.failIf(child1._file.closed)
        # open another file
        file2 = host.file(path2, 'w')
        child2 = host._children[1]
        self.assertEqual(len(host._children), 2)
        self.failIf(child2._file.closed)
        # close first file
        file1.close()
        self.assertEqual(len(host._children), 2)
        self.failUnless(child1._file.closed)
        self.failIf(child2._file.closed)
        # re-open first child's file
        file1 = host.file(path1, 'w')
        child1_1 = file1._host
        # check if it's reused
        self.failUnless(child1 is child1_1)
        self.failIf(child1._file.closed)
        self.failIf(child2._file.closed)
        # close second file
        file2.close()
        self.failUnless(child2._file.closed)

    def test_write_to_directory(self):
        """Test whether attempting to write to a directory fails."""
        host = _test_base.ftp_host_factory()
        self.assertRaises(ftp_error.FTPIOError, host.file,
                          '/home/sschwarzer', 'w')

    def test_binary_write(self):
        """Write binary data with `write`."""
        host = _test_base.ftp_host_factory()
        data = '\000a\001b\r\n\002c\003\n\004\r\005'
        output = host.file('dummy', 'wb')
        output.write(data)
        output.close()
        child_data = _mock_ftplib.content_of('dummy')
        expected_data = data
        self.assertEqual(child_data, expected_data)

    def test_ascii_write(self):
        """Write ASCII text with `write`."""
        host = _test_base.ftp_host_factory()
        data = ' \nline 2\nline 3'
        output = host.file('dummy', 'w')
        output.write(data)
        output.close()
        child_data = _mock_ftplib.content_of('dummy')
        expected_data = ' \r\nline 2\r\nline 3'
        self.assertEqual(child_data, expected_data)

    def test_ascii_writelines(self):
        """Write ASCII text with `writelines`."""
        host = _test_base.ftp_host_factory()
        data = [' \n', 'line 2\n', 'line 3']
        backup_data = data[:]
        output = host.file('dummy', 'w')
        output.writelines(data)
        output.close()
        child_data = _mock_ftplib.content_of('dummy')
        expected_data = ' \r\nline 2\r\nline 3'
        self.assertEqual(child_data, expected_data)
        # ensure that the original data was not modified
        self.assertEqual(data, backup_data)

    def test_ascii_read(self):
        """Read ASCII text with plain `read`."""
        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
        input_ = host.file('dummy', 'r')
        data = input_.read(0)
        self.assertEqual(data, '')
        data = input_.read(3)
        self.assertEqual(data, 'lin')
        data = input_.read(7)
        self.assertEqual(data, 'e 1\nano')
        data = input_.read()
        self.assertEqual(data, 'ther line\nyet another line')
        data = input_.read()
        self.assertEqual(data, '')
        input_.close()
        # try it again with a more "problematic" string which
        #  makes several reads in the `read` method necessary
        host = _test_base.ftp_host_factory(session_factory=AsciiReadMockSession)
        expected_data = AsciiReadMockSession.mock_file_content.\
                        replace('\r\n', '\n')
        input_ = host.file('dummy', 'r')
        data = input_.read(len(expected_data))
        self.assertEqual(data, expected_data)

    def test_binary_readline(self):
        """Read binary data with `readline`."""
        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
        input_ = host.file('dummy', 'rb')
        data = input_.readline(3)
        self.assertEqual(data, 'lin')
        data = input_.readline(10)
        self.assertEqual(data, 'e 1\r\n')
        data = input_.readline(13)
        self.assertEqual(data, 'another line\r')
        data = input_.readline()
        self.assertEqual(data, '\n')
        data = input_.readline()
        self.assertEqual(data, 'yet another line')
        data = input_.readline()
        self.assertEqual(data, '')
        input_.close()

    def test_ascii_readline(self):
        """Read ASCII text with `readline`."""
        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
        input_ = host.file('dummy', 'r')
        data = input_.readline(3)
        self.assertEqual(data, 'lin')
        data = input_.readline(10)
        self.assertEqual(data, 'e 1\n')
        data = input_.readline(13)
        self.assertEqual(data, 'another line\n')
        data = input_.readline()
        self.assertEqual(data, 'yet another line')
        data = input_.readline()
        self.assertEqual(data, '')
        input_.close()

    def test_ascii_readlines(self):
        """Read ASCII text with `readlines`."""
        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
        input_ = host.file('dummy', 'r')
        data = input_.read(3)
        self.assertEqual(data, 'lin')
        data = input_.readlines()
        self.assertEqual(data, ['e 1\n', 'another line\n',
                                'yet another line'])
        input_.close()

    def test_ascii_xreadlines(self):
        """Read ASCII text with `xreadlines`."""
        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
        # open file, skip some bytes
        input_ = host.file('dummy', 'r')
        data = input_.read(3)
        xrl_obj = input_.xreadlines()
        self.failUnless(xrl_obj.__class__ is ftp_file._XReadlines)
        self.failUnless(xrl_obj._ftp_file.__class__ is ftp_file._FTPFile)
        data = xrl_obj[0]
        self.assertEqual(data, 'e 1\n')
        # try to skip an index
        self.assertRaises(RuntimeError, operator.__getitem__, xrl_obj, 2)
        # continue reading
        data = xrl_obj[1]
        self.assertEqual(data, 'another line\n')
        data = xrl_obj[2]
        self.assertEqual(data, 'yet another line')
        # try to read beyond EOF
        self.assertRaises(IndexError, operator.__getitem__, xrl_obj, 3)

    def test_binary_iterator(self):
        """Test the iterator interface of `FTPFile` objects."""
        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
        input_ = host.file('dummy')
        input_iterator = iter(input_)
        self.assertEqual(input_iterator.next(), "line 1\n")
        self.assertEqual(input_iterator.next(), "another line\n")
        self.assertEqual(input_iterator.next(), "yet another line")
        self.assertRaises(StopIteration, input_iterator.next)
        input_.close()

    def test_ascii_iterator(self):
        """Test the iterator interface of `FTPFile` objects."""
        host = _test_base.ftp_host_factory(session_factory=ReadMockSession)
        input_ = host.file('dummy', 'rb')
        input_iterator = iter(input_)
        self.assertEqual(input_iterator.next(), "line 1\r\n")
        self.assertEqual(input_iterator.next(), "another line\r\n")
        self.assertEqual(input_iterator.next(), "yet another line")
        self.assertRaises(StopIteration, input_iterator.next)
        input_.close()

    def test_read_unknown_file(self):
        """Test whether reading a file which isn't there fails."""
        host = _test_base.ftp_host_factory()
        self.assertRaises(ftp_error.FTPIOError, host.file, 'notthere', 'r')


class TestUploadAndDownload(unittest.TestCase):
    """Test ASCII upload and binary download as examples."""
    def generate_ascii_file(self, data, filename):
        """Generate an ASCII data file."""
        source_file = open(filename, 'w')
        source_file.write(data)
        source_file.close()

    def test_ascii_upload(self):
        """Test ASCII mode upload."""
        local_source = '__test_source'
        data = ascii_data()
        self.generate_ascii_file(data, local_source)
        # upload
        host = _test_base.ftp_host_factory()
        host.upload(local_source, 'dummy')
        # check uploaded content
        # the data which was uploaded has its line endings converted
        #  so the conversion must also be applied to `data`
        data = data.replace('\n', '\r\n')
        remote_file_content = _mock_ftplib.content_of('dummy')
        self.assertEqual(data, remote_file_content)
        # clean up
        os.unlink(local_source)

    def test_binary_download(self):
        """Test binary mode download."""
        local_target = '__test_target'
        host = _test_base.ftp_host_factory(
               session_factory=BinaryDownloadMockSession)
        # download
        host.download('dummy', local_target, 'b')
        # read file and compare
        data = open(local_target, 'rb').read()
        remote_file_content = _mock_ftplib.content_of('dummy')
        self.assertEqual(data, remote_file_content)
        # clean up
        os.unlink(local_target)

    def test_conditional_upload(self):
        """Test conditional ASCII mode upload."""
        local_source = '__test_source'
        data = ascii_data()
        self.generate_ascii_file(data, local_source)
        # target is newer, so don't upload
        host = _test_base.ftp_host_factory(
               ftp_host_class=FailingUploadAndDownloadFTPHost)
        flag = host.upload_if_newer(local_source, '/home/newer')
        self.assertEqual(flag, False)
        # target is older, so upload
        host = _test_base.ftp_host_factory()
        flag = host.upload_if_newer(local_source, '/home/older')
        self.assertEqual(flag, True)
        # check uploaded content
        # the data which was uploaded has its line endings converted
        #  so the conversion must also be applied to 'data'
        data = data.replace('\n', '\r\n')
        remote_file_content = _mock_ftplib.content_of('older')
        self.assertEqual(data, remote_file_content)
        # target doesn't exist, so upload
        host = _test_base.ftp_host_factory()
        flag = host.upload_if_newer(local_source, '/home/notthere')
        self.assertEqual(flag, True)
        remote_file_content = _mock_ftplib.content_of('notthere')
        self.assertEqual(data, remote_file_content)
        # clean up
        os.unlink(local_source)

    def compare_and_delete_downloaded_data(self, filename):
        """Compare content of downloaded file with its source, then
        delete the local target file."""
        data = open(filename, 'rb').read()
        remote_file_content = _mock_ftplib.content_of('newer')
        self.assertEqual(data, remote_file_content)
        # clean up
        os.unlink(filename)

    def test_conditional_download_without_target(self):
        "Test conditional binary mode download when no target file exists."
        local_target = '__test_target'
        # target does not exist, so download
        host = _test_base.ftp_host_factory(
               session_factory=BinaryDownloadMockSession)
        flag = host.download_if_newer('/home/newer', local_target, 'b')
        self.assertEqual(flag, True)
        self.compare_and_delete_downloaded_data(local_target)

    def test_conditional_download_with_older_target(self):
        """Test conditional binary mode download with newer source file."""
        local_target = '__test_target'
        # make target file
        open(local_target, 'w').close()
        # source is newer, so download
        host = _test_base.ftp_host_factory(
               session_factory=BinaryDownloadMockSession)
        flag = host.download_if_newer('/home/newer', local_target, 'b')
        self.assertEqual(flag, True)
        self.compare_and_delete_downloaded_data(local_target)

    def test_conditional_download_with_newer_target(self):
        """Test conditional binary mode download with older source file."""
        local_target = '__test_target'
        # make target file
        open(local_target, 'w').close()
        # source is older, so don't download
        host = _test_base.ftp_host_factory(
               session_factory=BinaryDownloadMockSession)
        host = _test_base.ftp_host_factory(
               ftp_host_class=FailingUploadAndDownloadFTPHost,
               session_factory=BinaryDownloadMockSession)
        flag = host.download_if_newer('/home/older', local_target, 'b')
        self.assertEqual(flag, False)
        # remove target file
        os.unlink(local_target)


class TestTimeShift(unittest.TestCase):
    def test_rounded_time_shift(self):
        """Test if time shift is rounded correctly."""
        host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
        # use private bound method
        rounded_time_shift = host._FTPHost__rounded_time_shift
        # original value, expected result
        test_data = [
          (0, 0), (0.1, 0), (-0.1, 0), (1500, 0), (-1500, 0),
          (1800, 3600), (-1800, -3600), (2000, 3600), (-2000, -3600),
          (5*3600-100, 5*3600), (-5*3600+100, -5*3600)]
        for time_shift, expected_time_shift in test_data:
            calculated_time_shift = rounded_time_shift(time_shift)
            self.assertEqual(calculated_time_shift, expected_time_shift)

    def test_assert_valid_time_shift(self):
        """Test time shift sanity checks."""
        host = _test_base.ftp_host_factory(session_factory=TimeShiftMockSession)
        # use private bound method
        assert_time_shift = host._FTPHost__assert_valid_time_shift
        # valid time shifts
        test_data = [23*3600, -23*3600, 3600+30, -3600+30]
        for time_shift in test_data:
            self.failUnless(assert_time_shift(time_shift) is None)
        # invalid time shift (exceeds one day)
        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 25*3600)
        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, -25*3600)
        # invalid time shift (deviation from full hours unacceptable)
        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift, 10*60)
        self.assertRaises(ftp_error.TimeShiftError, assert_time_shift,
                          -3600-10*60)

    def test_synchronize_times(self):
        """Test time synchronization with server."""
        host = _test_base.ftp_host_factory(ftp_host_class=TimeShiftFTPHost,
               session_factory=TimeShiftMockSession)
        # valid time shift
        host.path.set_mtime(time.time() + 3630)
        host.synchronize_times()
        self.assertEqual(host.time_shift(), 3600)
        # invalid time shift
        host.path.set_mtime(time.time() + 3600+10*60)
        self.assertRaises(ftp_error.TimeShiftError, host.synchronize_times)


if __name__ == '__main__':
    unittest.main()



syntax highlighted by Code2HTML, v. 0.9.1