#!/usr/bin/env python

# test_ftpd.py


#  ======================================================================

#  Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>

#

#                         All Rights Reserved

#

#  Permission to use, copy, modify, and distribute this software and

#  its documentation for any purpose and without fee is hereby

#  granted, provided that the above copyright notice appear in all

#  copies and that both that copyright notice and this permission

#  notice appear in supporting documentation, and that the name of

#  Giampaolo Rodola' not be used in advertising or publicity pertaining to

#  distribution of the software without specific, written prior

#  permission.

#

#  Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,

#  INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN

#  NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR

#  CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS

#  OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,

#  NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN

#  CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

#  ======================================================================



import threading
import unittest
import socket
import os
import atexit
import time
import tempfile
import ftplib
import random
import warnings

from pyftpdlib import ftpserver

__release__ = 'pyftpdlib 0.2.0'


# This test suite has been run successfully on the following systems:


# -------------------------------------------------------

#  System                          | Python version

# -------------------------------------------------------

#  Windows XP prof sp2             | 2.3, 2.4, 2.5, 2.6a

#  Linux CentOS 2.6.20.15          | 2.4

#  Linux Ubuntu 2.6.20-15          | 2.4, 2.5

#  Linux Debian 2.4.27-2-386       | 2.3.5

#  OS X 10.4.10                    | 2.3, 2.4, 2.5

#  FreeBSD 6.0, 7.0                | 2.4, 2.5

# -------------------------------------------------------



# TODO:

# - Test QUIT while a transfer is in progress.

# - Test data transfer in ASCII mode.

# - Test AbstractedFS.translate() on systems having os.sep == ':'.

# - Test FTPHandler.masquearade_address and FTPHandler.passive_ports behaviours



class AbstractedFSClass(unittest.TestCase):

    def test_normalize(self):
        ae = self.assertEquals
        fs = ftpserver.AbstractedFS()

        fs.cwd = '/'
        ae(fs.normalize(''), '/')
        ae(fs.normalize('/'), '/')
        ae(fs.normalize('.'), '/')
        ae(fs.normalize('..'), '/')
        ae(fs.normalize('a'), '/a')
        ae(fs.normalize('/a'), '/a')
        ae(fs.normalize('/a/'), '/a')
        ae(fs.normalize('a/..'), '/')
        ae(fs.normalize('a/b'), '/a/b')
        ae(fs.normalize('a/b/..'), '/a')
        ae(fs.normalize('a/b/../..'), '/')
        fs.cwd = '/sub'
        ae(fs.normalize(''), '/sub')
        ae(fs.normalize('/'), '/')
        ae(fs.normalize('.'), '/sub')
        ae(fs.normalize('..'), '/')
        ae(fs.normalize('a'), '/sub/a')
        ae(fs.normalize('a/'), '/sub/a')
        ae(fs.normalize('a/..'), '/sub')
        ae(fs.normalize('a/b'), '/sub/a/b')
        ae(fs.normalize('a/b/'), '/sub/a/b')
        ae(fs.normalize('a/b/..'), '/sub/a')
        ae(fs.normalize('a/b/../..'), '/sub')
        ae(fs.normalize('a/b/../../..'), '/')
        ae(fs.normalize('//'), '/') # UNC paths must be collapsed


    def test_translate(self):
        ae = self.assertEquals
        fs = ftpserver.AbstractedFS()

        # Unix

        if os.sep == '/':
            fs.root = '/home/user'
            fs.cwd = '/'
            ae(fs.translate(''), '/home/user')
            ae(fs.translate('/'), '/home/user')
            ae(fs.translate('.'), '/home/user')
            ae(fs.translate('..'), '/home/user')
            ae(fs.translate('a'), '/home/user/a')
            ae(fs.translate('/a'), '/home/user/a')
            ae(fs.translate('/a/'), '/home/user/a')
            ae(fs.translate('a/..'), '/home/user')
            ae(fs.translate('a/b'), '/home/user/a/b')
            ae(fs.translate('/a/b'), '/home/user/a/b')
            ae(fs.translate('/a/b/..'), '/home/user/a')
            ae(fs.translate('/a/b/../..'), '/home/user')
            fs.cwd = '/sub'
            ae(fs.translate(''), '/home/user/sub')
            ae(fs.translate('/'), '/home/user')
            ae(fs.translate('.'), '/home/user/sub')
            ae(fs.translate('..'), '/home/user')
            ae(fs.translate('a'), '/home/user/sub/a')
            ae(fs.translate('a/'), '/home/user/sub/a')
            ae(fs.translate('a/..'), '/home/user/sub')
            ae(fs.translate('a/b'), '/home/user/sub/a/b')
            ae(fs.translate('a/b/..'), '/home/user/sub/a')
            ae(fs.translate('a/b/../..'), '/home/user/sub')
            ae(fs.translate('a/b/../../..'), '/home/user')
            ae(fs.translate('//a'), '/home/user/a') # UNC paths must be collapsed


            # the same as above but using the root directory /

            fs.root = '/'
            fs.cwd = '/'
            ae(fs.translate(''), '/')
            ae(fs.translate('/'), '/')
            ae(fs.translate('.'), '/')
            ae(fs.translate('..'), '/')
            ae(fs.translate('a'), '/a')
            ae(fs.translate('/a'), '/a')
            ae(fs.translate('/a/'), '/a')
            ae(fs.translate('a/..'), '/')
            ae(fs.translate('a/b'), '/a/b')
            ae(fs.translate('/a/b'), '/a/b')
            ae(fs.translate('/a/b/..'), '/a')
            ae(fs.translate('/a/b/../..'), '/')
            fs.cwd = '/sub'
            ae(fs.translate(''), '/sub')
            ae(fs.translate('/'), '/')
            ae(fs.translate('.'), '/sub')
            ae(fs.translate('..'), '/')
            ae(fs.translate('a'), '/sub/a')
            ae(fs.translate('a/'), '/sub/a')
            ae(fs.translate('a/..'), '/sub')
            ae(fs.translate('a/b'), '/sub/a/b')
            ae(fs.translate('a/b/..'), '/sub/a')
            ae(fs.translate('a/b/../..'), '/sub')
            ae(fs.translate('a/b/../../..'), '/')
            ae(fs.translate('//a'), '/a') # UNC paths must be collapsed


        # Windows

        elif os.sep == '\\':
            fs.root = r'C:\dir'
            fs.cwd = '/'
            ae(fs.translate(''), r'C:\dir')
            ae(fs.translate('/'), r'C:\dir')
            ae(fs.translate('.'), r'C:\dir')
            ae(fs.translate('..'), r'C:\dir')
            ae(fs.translate('a'), r'C:\dir\a')
            ae(fs.translate('/a'), r'C:\dir\a')
            ae(fs.translate('/a/'), r'C:\dir\a')
            ae(fs.translate('a/..'), r'C:\dir')
            ae(fs.translate('a/b'), r'C:\dir\a\b')
            ae(fs.translate('/a/b'), r'C:\dir\a\b')
            ae(fs.translate('/a/b/..'), r'C:\dir\a')
            ae(fs.translate('/a/b/../..'), r'C:\dir')
            fs.cwd = '/sub'
            ae(fs.translate(''), r'C:\dir\sub')
            ae(fs.translate('/'), r'C:\dir')
            ae(fs.translate('.'), r'C:\dir\sub')
            ae(fs.translate('..'), r'C:\dir')
            ae(fs.translate('a'), r'C:\dir\sub\a')
            ae(fs.translate('a/'), r'C:\dir\sub\a')
            ae(fs.translate('a/..'), r'C:\dir\sub')
            ae(fs.translate('a/b'), r'C:\dir\sub\a\b')
            ae(fs.translate('a/b/..'), r'C:\dir\sub\a')
            ae(fs.translate('a/b/../..'), r'C:\dir\sub')
            ae(fs.translate('a/b/../../..'), r'C:\dir')
            ae(fs.translate('//a'), r'C:\dir\a') # UNC paths must be collapsed


            # the same as above but using the drive root C:\

            fs.root = 'C:\\'
            fs.cwd = '/'
            ae(fs.translate(''), 'C:\\')
            ae(fs.translate('/'), 'C:\\')
            ae(fs.translate('.'), 'C:\\')
            ae(fs.translate('..'), 'C:\\')
            ae(fs.translate('a'), r'C:\a')
            ae(fs.translate('/a'), r'C:\a')
            ae(fs.translate('/a/'), r'C:\a')
            ae(fs.translate('a/..'), 'C:\\')
            ae(fs.translate('a/b'), r'C:\a\b')
            ae(fs.translate('/a/b'), r'C:\a\b')
            ae(fs.translate('/a/b/..'), r'C:\a')
            ae(fs.translate('/a/b/../..'), 'C:\\')
            fs.cwd = '/sub'
            ae(fs.translate(''), r'C:\sub')
            ae(fs.translate('/'), 'C:\\')
            ae(fs.translate('.'), 'C:\\sub')
            ae(fs.translate('..'), 'C:\\')
            ae(fs.translate('a'), r'C:\sub\a')
            ae(fs.translate('a/'), r'C:\sub\a')
            ae(fs.translate('a/..'), r'C:\sub')
            ae(fs.translate('a/b'), r'C:\sub\a\b')
            ae(fs.translate('a/b/..'), r'C:\sub\a')
            ae(fs.translate('a/b/../..'), r'C:\sub')
            ae(fs.translate('a/b/../../..'), 'C:\\')
            ae(fs.translate('//a'), r'C:\a') # UNC paths must be collapsed


        # On Mac OS 8 & 9, the folder delimiter is colon (":"), and the path

        # separator is a semi-colon (";").  Not enough hardware (...and money

        # ;-)) for creating tests.

        else:
            self.fail('Test not available for such system (os.sep == "%s").'
                        %os.sep)


class DummyAuthorizerClass(unittest.TestCase):

    # temporarily change warnings to exceptions for the purposes of testing

    def setUp(self):
        warnings.filterwarnings("error")

    def tearDown(self):
        warnings.resetwarnings()

    def test_dummy_authorizer(self):
        auth = ftpserver.DummyAuthorizer()
        auth.user_table = {}

        # create user

        auth.add_user(user, pwd, home, perm=('r', 'w'))
        auth.add_anonymous(home)
        # check credentials

        self.failUnless(auth.validate_authentication(user, pwd))
        self.failIf(auth.validate_authentication(user, 'wrongpwd'))
        # remove them

        auth.remove_user(user)
        auth.remove_user('anonymous')
        
        # raise exc if user does not exists

        self.assertRaises(KeyError, auth.remove_user, user)
        # raise exc if path does not exist

        self.assertRaises(ftpserver.AuthorizerError, auth.add_user, user,
                            pwd, '?:\\')
        self.assertRaises(ftpserver.AuthorizerError, auth.add_anonymous, '?:\\')
        # raise exc if user already exists

        auth.add_user(user, pwd, home)
        auth.add_anonymous(home)
        self.assertRaises(ftpserver.AuthorizerError, auth.add_user, user,
                            pwd, home)
        self.assertRaises(ftpserver.AuthorizerError, auth.add_anonymous, home)
        auth.remove_user(user)
        auth.remove_user('anonymous')

        # raise on wrong permission

        self.assertRaises(ftpserver.AuthorizerError, auth.add_user, user, pwd,
                            home, perm=('?'))
        self.assertRaises(ftpserver.AuthorizerError, auth.add_anonymous, home,
                            perm=('?'))
        # expect warning on 'w' permission assigned to anonymous user

        self.assertRaises(RuntimeWarning, auth.add_anonymous, home, perm=('w'))


class FtpAuthentication(unittest.TestCase):
    "test: USER, PASS, REIN"

    def setUp(self):
        global ftp
        ftp = ftplib.FTP()
        ftp.connect(host=host, port=port)
        self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
        self.f2 = open(tempfile.mktemp(dir=home), 'w+b')

    def tearDown(self):
        ftp.close()
        if not self.f1.closed:
            self.f1.close()
        if not self.f2.closed:
            self.f2.close()
        os.remove(self.f1.name)
        os.remove(self.f2.name)

    def test_auth_ok(self):
        ftp.login(user=user, passwd=pwd)

    def test_auth_failed(self):
        self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')

    def test_anon_auth(self):
        ftp.login(user='anonymous', passwd='anon@')
        ftp.login(user='AnonYmoUs', passwd='anon@')
        ftp.login(user='anonymous', passwd='')

    def test_max_auth(self):
        self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')
        self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')
        self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')
        # If authentication fails for 3 times ftpd disconnect us.

        # We can check if this happen by using ftp.sendcmd() on the 'dead'

        # socket object.  If socket object is really dead it should be raised

        # socket.error exception (Windows) or EOFError exception (Linux).

        self.failUnlessRaises((socket.error, EOFError), ftp.sendcmd, '')

    def test_rein(self):
        """Test REIN while no transfer is in progress."""
        ftp.login(user=user, passwd=pwd)
        ftp.sendcmd('rein')
        # user is not yet authenticated, a permission error response is expected

        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
        # by logging-in again we should be able to execute a file-system command

        ftp.login(user=user, passwd=pwd)
        ftp.sendcmd('pwd')

    def test_rein_on_transfer(self):
        """Test REIN while a transfer is in progress."""
        ftp.login(user=user, passwd=pwd)
        data = 'abcde12345' * 100000
        fname_1 = os.path.basename(self.f1.name)
        self.f1.write(data)
        self.f1.close()

        ftp.voidcmd('TYPE I')
        conn = ftp.transfercmd('retr ' + fname_1)
        bytes_recv = 0
        rein_sent = 0
        while 1:
            chunk = conn.recv(8192)
            # stop transfer while it isn't finished yet

            if bytes_recv >= 524288: # 2^19

                if not rein_sent:
                    # flush account, expect an error response

                    ftp.sendcmd('rein')
                    self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
                    rein_sent = 1
            if not chunk:
                break
            self.f2.write(chunk)
            bytes_recv += len(chunk)

        # a 226 response is expected once tranfer finishes

        self.assertEqual(ftp.voidresp()[:3], '226')
        # account is still flushed, error response is still expected

        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'size ' + fname_1)
        # by logging-in again we should be able to execute a file-system command

        ftp.login(user=user, passwd=pwd)
        ftp.sendcmd('pwd')
        self.f2.seek(0)
        self.assertEqual(hash(data), hash (self.f2.read()))

    def test_user(self):
        """Test USER while already authenticated and no transfer is in progress.
        """
        ftp.login(user=user, passwd=pwd)
        ftp.sendcmd('user ' + user)
        # user is not yet authenticated, a permission error response is expected

        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
        # by logging-in again we should be able to execute a file-system command

        ftp.sendcmd('pass ' + pwd)
        ftp.sendcmd('pwd')

    def test_user_on_transfer(self):
        """Test USER while already authenticated and a transfer is in progress.
        """
        ftp.login(user=user, passwd=pwd)
        data = 'abcde12345' * 100000
        fname_1 = os.path.basename(self.f1.name)
        self.f1.write(data)
        self.f1.close()

        ftp.voidcmd('TYPE I')
        conn = ftp.transfercmd('retr ' + fname_1)
        bytes_recv = 0
        rein_sent = 0
        while 1:
            chunk = conn.recv(8192)
            # stop transfer while it isn't finished yet

            if bytes_recv >= 524288: # 2^19

                if not rein_sent:
                    # flush account, expect an error response

                    ftp.sendcmd('user ' + user)
                    self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
                    rein_sent = 1
            if not chunk:
                break
            self.f2.write(chunk)
            bytes_recv += len(chunk)

        # a 226 response is expected once tranfer finishes

        self.assertEqual(ftp.voidresp()[:3], '226')
        # account is still flushed, error response is still expected

        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
        # by logging-in again we should be able to execute a file-system command

        ftp.sendcmd('pass ' + pwd)
        ftp.sendcmd('pwd')
        self.f2.seek(0)
        self.assertEqual(hash(data), hash (self.f2.read()))


class FtpDummyCmds(unittest.TestCase):
    "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP"

    def setUp(self):
        global ftp
        ftp = ftplib.FTP()
        ftp.connect(host=host, port=port)
        ftp.login(user=user, passwd=pwd)

    def tearDown(self):
        ftp.close()

    def test_type(self):
        ftp.sendcmd('type a')
        ftp.sendcmd('type i')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'type')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'type ?!?')

    def test_stru(self):
        ftp.sendcmd('stru f')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'stru')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'stru ?!?')

    def test_mode(self):
        ftp.sendcmd('mode s')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'mode')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'mode ?!?')

    def test_noop(self):
        ftp.sendcmd('noop')

    def test_syst(self):
        ftp.sendcmd('syst')

    def test_allo(self):
        ftp.sendcmd('allo x')

    def test_help(self):
        ftp.sendcmd('help')
        cmd = random.choice(ftpserver.proto_cmds.keys())
        ftp.sendcmd('help %s' %cmd)
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'help ?!?')

    def test_rest(self):
        # just test rest's semantic without using data-transfer

        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'rest')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'rest str')
        self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'rest -1')

    def test_quit(self):
        ftp.sendcmd('quit')


class FtpFsOperations(unittest.TestCase):
    "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"

    def setUp(self):
        global ftp
        ftp = ftplib.FTP()
        ftp.connect(host=host, port=port)
        ftp.login(user=user, passwd=pwd)
        self.tempfile = os.path.basename(open(tempfile.mktemp(dir=home), 'w+b').name)
        self.tempdir = os.path.basename(tempfile.mktemp(dir=home))
        os.mkdir(self.tempdir)

    def tearDown(self):
        ftp.close()
        if os.path.exists(self.tempfile):
            os.remove(self.tempfile)
        if os.path.exists(self.tempdir):
            os.rmdir(self.tempdir)

    def test_cwd(self):
        ftp.cwd(self.tempdir)
        self.assertRaises(ftplib.error_perm, ftp.cwd, 'subtempdir')

    def test_pwd(self):
        self.assertEqual(ftp.pwd(), '/')
        ftp.cwd(self.tempdir)
        self.assertEqual(ftp.pwd(), '/' + self.tempdir)

    def test_cdup(self):
        # ftplib.parse257 function is usually used for parsing the '257'

        # response for a MKD or PWD request returning the directory name

        # in the 257 reply.

        # Even if CDUP response code is different (250) we could use parse257

        # anyway for getting directory name.

        ftp.cwd(self.tempdir)
        dir = ftplib.parse257(ftp.sendcmd('cdup').replace('250', '257'))
        self.assertEqual(dir, '/')
        dir = ftplib.parse257(ftp.sendcmd('cdup').replace('250', '257'))
        self.assertEqual(dir, '/')

    def test_mkd(self):
        tempdir = os.path.basename(tempfile.mktemp(dir=home))
        ftp.mkd(tempdir)
        # make sure we can't create directories which already exist (probably

        # not really necessary);

        # let's use a try/except statement to avoid leaving behind orphaned

        # temporary directory in the event of a test failure.

        try:
            ftp.mkd(tempdir)
        except ftplib.error_perm, err:
            os.rmdir(tempdir) # ok

        else:
            self.fail('ftplib.error_perm not raised.')

    def test_rmd(self):
        ftp.rmd(self.tempdir)
        # make sure we can't use rmd against files

        self.assertRaises(ftplib.error_perm, ftp.rmd, self.tempfile)
        # make sure we can't remove root directory

        self.assertRaises(ftplib.error_perm, ftp.rmd, '/')

    def test_dele(self):
        ftp.delete(self.tempfile)
        # make sure we can't rename root directory, just to be safe,

        # maybe not really necessary...

        self.assertRaises(ftplib.error_perm, ftp.delete, self.tempdir)

    def test_rnfr_rnto(self):
        # rename file

        tempname = os.path.basename(tempfile.mktemp(dir=home))
        ftp.rename(self.tempfile, tempname)
        ftp.rename(tempname, self.tempfile)
        # rename dir

        tempname = os.path.basename(tempfile.mktemp(dir=home))
        ftp.rename(self.tempdir, tempname)
        ftp.rename(tempname, self.tempdir)
        # rnfr/rnto over non-existing paths

        bogus = os.path.basename(tempfile.mktemp(dir=home))
        self.assertRaises(ftplib.error_perm, ftp.rename, bogus, '/x')
        self.assertRaises(ftplib.error_perm, ftp.rename, self.tempfile, '/')
        # make sure we can't rename root directory, just to be safe,

        # maybe not really necessary...

        self.assertRaises(ftplib.error_perm, ftp.rename, '/', '/x')

    def test_mdtm(self):
        ftp.sendcmd('mdtm ' + self.tempfile)
        # make sure we can't use mdtm against directories

        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'mdtm ' + self.tempdir)

    def test_size(self):
        ftp.size(self.tempfile)
        # make sure we can't use size against directories

        self.assertRaises(ftplib.error_perm, ftp.size, self.tempdir)

    def test_stat(self):
        ftp.sendcmd('stat')
        ftp.sendcmd('stat *')
        ftp.sendcmd('stat ' + self.tempfile)
        ftp.sendcmd('stat ' + self.tempdir)
        self.failUnless('Directory is empty' in ftp.sendcmd('stat '+ self.tempdir))
        self.failUnless('recursion not supported' in ftp.sendcmd('stat /*/*'))


class FtpRetrieveData(unittest.TestCase):
    "test: RETR, REST, LIST, NLST"

    def setUp(self):
        global ftp
        ftp = ftplib.FTP()
        ftp.connect(host=host, port=port)
        ftp.login(user=user, passwd=pwd)
        self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
        self.f2 = open(tempfile.mktemp(dir=home), 'w+b')

    def tearDown(self):
        ftp.close()
        if not self.f1.closed:
            self.f1.close()
        if not self.f2.closed:
            self.f2.close()
        os.remove(self.f1.name)
        os.remove(self.f2.name)

    def test_retr(self):
        data = 'abcde12345' * 100000
        self.f1.write(data)
        self.f1.close()
        remote_fname = os.path.basename(self.f1.name)
        ftp.retrbinary("retr " + remote_fname, self.f2.write)
        self.f2.seek(0)
        self.assertEqual(hash(data), hash(self.f2.read()))

    def test_restore_on_retr(self):
        data = 'abcde12345' * 100000
        fname_1 = os.path.basename(self.f1.name)
        fname_2 = os.path.basename(self.f2.name)
        self.f1.write(data)
        self.f1.close()

        # look at ftplib.FTP.retrbinary method to understand this mess

        ftp.voidcmd('TYPE I')
        conn = ftp.transfercmd('retr ' + fname_1)
        bytes_recv = 0
        while 1:
            chunk = conn.recv(8192)
            # stop transfer while it isn't finished yet

            if bytes_recv >= 524288: # 2^19

                break
            elif not chunk:
                break
            self.f2.write(chunk)
            bytes_recv += len(chunk)
        conn.close()

        # transfer wasn't finished yet so we expect a 426 response

        self.failUnlessRaises(ftplib.error_temp, ftp.voidresp)

        # resuming transfer by using a marker value greater than the file

        # size stored on the server should result in an error on retr

        file_size = ftp.size(fname_1)
        ftp.sendcmd('rest %s' %((file_size + 1)))
        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'retr ' + fname_1)

        # test resume

        ftp.sendcmd('rest %s' %bytes_recv)
        ftp.retrbinary("retr " + fname_1, self.f2.write)
        self.f2.seek(0)
        self.assertEqual(hash(data), hash (self.f2.read()))

    def test_list(self):
        l = []
        ftp.retrlines('LIST ' + os.path.basename(self.f1.name), l.append)
        self.assertEqual(len(l), 1)
        l = []
        l1, l2, l3, l4 = [], [], [], []
        ftp.retrlines('LIST', l.append)
        ftp.retrlines('LIST -a', l1.append)
        ftp.retrlines('LIST -l', l2.append)
        ftp.retrlines('LIST -al', l3.append)
        ftp.retrlines('LIST -la', l4.append)
        x = [l, l1, l2, l3, l4]
        for i in range(0,4):
            self.assertEqual(x[i], x[i+1])

    def test_nlst(self):
        l = []
        ftp.retrlines('NLST', l.append)
        fname = os.path.basename(self.f1.name)
        self.assertRaises(ftplib.error_perm, ftp.retrlines, 'NLST ' + fname, l.append)


class FtpAbort(unittest.TestCase):
    "test: ABOR"

    def setUp(self):
        global ftp
        ftp = ftplib.FTP()
        ftp.connect(host=host, port=port)
        ftp.login(user=user, passwd=pwd)
        self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
        self.f2 = open(tempfile.mktemp(dir=home), 'w+b')

    def tearDown(self):
        ftp.close()
        if not self.f1.closed:
            self.f1.close()
        if not self.f2.closed:
            self.f2.close()
        os.remove(self.f1.name)
        os.remove(self.f2.name)

    def test_abor_no_data(self):
        # Case 1: ABOR while no data channel is opened: respond with 225.

        resp = ftp.sendcmd('ABOR')
        self.failUnlessEqual('225 No transfer to abort.', resp)

    def test_abor_pasv(self):
        # Case 2: user sends a PASV, a data-channel socket is listening but not

        # connected, and ABOR is sent: close listening data socket, respond

        # with 225.

        ftp.sendcmd('PASV')
        respcode = ftp.sendcmd('ABOR')[:3]
        self.failUnlessEqual('225', respcode)

    def test_abor_port(self):
        # Case 3: data channel opened with PASV or PORT, but ABOR sent before

        # a data transfer has been started: close data channel, respond with 225

        ftp.makeport()
        respcode = ftp.sendcmd('ABOR')[:3]
        self.failUnlessEqual('225', respcode)

    def test_abor(self):
        # Case 4: ABOR while a data transfer on DTP channel is in progress:

        # close data channel, respond with 426, respond with 226.

        data = 'abcde12345' * 100000
        self.f1.write(data)
        self.f1.close()

        # this ugly loop construct is to simulate an interrupted transfer since

        # ftplib doesn't like running storbinary() in a separate thread

        ftp.voidcmd('TYPE I')
        conn = ftp.transfercmd('retr ' + os.path.basename(self.f1.name))
        bytes_recv = 0
        while 1:
            chunk = conn.recv(8192)
            # stop transfer while it isn't finished yet

            if bytes_recv >= 524288: # 2^19

                break
            elif not chunk:
                break
            self.f2.write(chunk)
            bytes_recv += len(chunk)
        ftp.putcmd('ABOR')

        # transfer isn't finished yet so ftpd should respond with 426

        self.failUnlessRaises(ftplib.error_temp, ftp.voidresp)

        # transfer successfully aborted, so should now respond with a 226

        self.failUnlessEqual('226', ftp.voidresp()[:3])


class FtpStoreData(unittest.TestCase):
    "test: STOR, STOU, APPE, REST"

    def setUp(self):
        global ftp
        ftp = ftplib.FTP()
        ftp.connect(host=host, port=port)
        ftp.login(user=user, passwd=pwd)
        self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
        self.f2 = open(tempfile.mktemp(dir=home), 'w+b')

    def tearDown(self):
        ftp.close()
        if not self.f1.closed:
            self.f1.close()
        if not self.f2.closed:
            self.f2.close()
        os.remove(self.f1.name)
        os.remove(self.f2.name)

    def test_stor(self):
        data = 'abcde12345' * 100000
        self.f1.write(data)
        self.f1.seek(0)
        remote_fname = os.path.basename(tempfile.mktemp(dir=home))
        ftp.storbinary('stor ' + remote_fname, self.f1)
        ftp.retrbinary('retr ' + remote_fname, self.f2.write)
        self.f2.seek(0)
        self.assertEqual(hash(data), hash (self.f2.read()))
        # we do not use os.remove because file could be still locked by ftpd thread

        ftp.delete(remote_fname)

    def test_stou(self):
        data = 'abcde12345' * 100000
        self.f1.write(data)
        self.f1.seek(0)

        ftp.voidcmd('TYPE I')
        # filename comes in as 1xx FILE: <filename>

        filename = ftp.sendcmd('stou').split('FILE: ')[1]
        sock = ftp.makeport()
        conn, sockaddr = sock.accept()
        while 1:
            buf = self.f1.read(8192)
            if not buf:
                break
            conn.sendall(buf)
        conn.close()
        # transfer finished, a 226 response is expected

        ftp.voidresp()
        ftp.retrbinary('retr ' + filename, self.f2.write)
        self.f2.seek(0)
        self.assertEqual(hash(data), hash (self.f2.read()))
        # we do not use os.remove because file could be

        # still locked by ftpd thread

        ftp.delete(filename)

    def test_stou_rest(self):
        # watch for STOU preceded by REST, which makes no sense.

        ftp.sendcmd('rest 10')
        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'stou')

    def test_appe(self):
        fname_1 = os.path.basename(self.f1.name)
        fname_2 = os.path.basename(self.f2.name)
        remote_fname = os.path.basename(tempfile.mktemp(dir=home))

        data1 = 'abcde12345' * 100000
        self.f1.write(data1)
        self.f1.seek(0)
        ftp.storbinary('stor ' + remote_fname, self.f1)

        data2 = 'fghil67890' * 100000
        self.f1.write(data2)
        self.f1.seek(ftp.size(remote_fname))
        ftp.storbinary('appe ' + remote_fname, self.f1)

        ftp.retrbinary("retr " + remote_fname, self.f2.write)
        self.f2.seek(0)
        self.assertEqual(hash(data1 + data2), hash (self.f2.read()))
        ftp.delete(remote_fname)

    def test_appe_rest(self):
        # watch for APPE preceded by REST, which makes no sense.

        ftp.sendcmd('rest 10')
        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'appe x')

    def test_rest_on_stor(self):
        fname_1 = os.path.basename(self.f1.name)
        fname_2 = os.path.basename(self.f2.name)
        remote_fname = os.path.basename(tempfile.mktemp(dir=home))

        data = 'abcde12345' * 100000
        self.f1.write(data)
        self.f1.seek(0)

        ftp.voidcmd('TYPE I')
        conn = ftp.transfercmd('stor ' + remote_fname)
        bytes_sent = 0
        while 1:
            chunk = self.f1.read(8192)
            conn.sendall(chunk)
            bytes_sent += len(chunk)
            # stop transfer while it isn't finished yet

            if bytes_sent >= 524288: # 2^19

                break
            elif not chunk:
                break
        conn.close()
        # transfer wasn't finished yet so we expect a 426 response

        ftp.voidresp()

        # resuming transfer by using a marker value greater than the file

        # size stored on the server should result in an error on stor

        file_size = ftp.size(remote_fname)
        self.assertEqual(file_size, bytes_sent)
        ftp.sendcmd('rest %s' %((file_size + 1)))
        self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'stor ' + remote_fname)

        ftp.sendcmd('rest %s' %bytes_sent)
        ftp.storbinary('stor ' + remote_fname, self.f1)

        ftp.retrbinary('retr ' + remote_fname, self.f2.write)
        self.f1.seek(0)
        self.f2.seek(0)
        self.assertEqual(hash(self.f1.read()), hash(self.f2.read()))
        ftp.delete(remote_fname)


def run():
    class ftpd(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)

        def run(self):
            def devnull(msg):
                pass
            ftpserver.log = devnull
            ftpserver.logline = devnull
            ftpserver.debug = devnull
            authorizer = ftpserver.DummyAuthorizer()
            authorizer.add_user(user, pwd, home, perm=('r', 'w'))
            authorizer.add_anonymous(home)
            ftp_handler = ftpserver.FTPHandler
            ftp_handler.authorizer = authorizer
            address = (host, port)
            ftpd = ftpserver.FTPServer(address, ftp_handler)
            ftpd.serve_forever()

    def exit_fun():
        os._exit(0)
    atexit.register(exit_fun)

    f = ftpd()
    f.start()
    time.sleep(0.3)
    unittest.main()


host = '127.0.0.1'
port = 54321
user = 'user'
pwd = '12345'
home = os.getcwd()

if __name__ == '__main__':
    run()


syntax highlighted by Code2HTML, v. 0.9.1