#!/usr/bin/env python
# test_ftpd.py
# ======================================================================
# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# Giampaolo Rodola' not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
import threading
import unittest
import socket
import os
import atexit
import time
import tempfile
import ftplib
import random
import warnings
from pyftpdlib import ftpserver
__release__ = 'pyftpdlib 0.2.0'
# This test suite has been run successfully on the following systems:
# -------------------------------------------------------
# System | Python version
# -------------------------------------------------------
# Windows XP prof sp2 | 2.3, 2.4, 2.5, 2.6a
# Linux CentOS 2.6.20.15 | 2.4
# Linux Ubuntu 2.6.20-15 | 2.4, 2.5
# Linux Debian 2.4.27-2-386 | 2.3.5
# OS X 10.4.10 | 2.3, 2.4, 2.5
# FreeBSD 6.0, 7.0 | 2.4, 2.5
# -------------------------------------------------------
# TODO:
# - Test QUIT while a transfer is in progress.
# - Test data transfer in ASCII mode.
# - Test AbstractedFS.translate() on systems having os.sep == ':'.
# - Test FTPHandler.masquearade_address and FTPHandler.passive_ports behaviours
class AbstractedFSClass(unittest.TestCase):
def test_normalize(self):
ae = self.assertEquals
fs = ftpserver.AbstractedFS()
fs.cwd = '/'
ae(fs.normalize(''), '/')
ae(fs.normalize('/'), '/')
ae(fs.normalize('.'), '/')
ae(fs.normalize('..'), '/')
ae(fs.normalize('a'), '/a')
ae(fs.normalize('/a'), '/a')
ae(fs.normalize('/a/'), '/a')
ae(fs.normalize('a/..'), '/')
ae(fs.normalize('a/b'), '/a/b')
ae(fs.normalize('a/b/..'), '/a')
ae(fs.normalize('a/b/../..'), '/')
fs.cwd = '/sub'
ae(fs.normalize(''), '/sub')
ae(fs.normalize('/'), '/')
ae(fs.normalize('.'), '/sub')
ae(fs.normalize('..'), '/')
ae(fs.normalize('a'), '/sub/a')
ae(fs.normalize('a/'), '/sub/a')
ae(fs.normalize('a/..'), '/sub')
ae(fs.normalize('a/b'), '/sub/a/b')
ae(fs.normalize('a/b/'), '/sub/a/b')
ae(fs.normalize('a/b/..'), '/sub/a')
ae(fs.normalize('a/b/../..'), '/sub')
ae(fs.normalize('a/b/../../..'), '/')
ae(fs.normalize('//'), '/') # UNC paths must be collapsed
def test_translate(self):
ae = self.assertEquals
fs = ftpserver.AbstractedFS()
# Unix
if os.sep == '/':
fs.root = '/home/user'
fs.cwd = '/'
ae(fs.translate(''), '/home/user')
ae(fs.translate('/'), '/home/user')
ae(fs.translate('.'), '/home/user')
ae(fs.translate('..'), '/home/user')
ae(fs.translate('a'), '/home/user/a')
ae(fs.translate('/a'), '/home/user/a')
ae(fs.translate('/a/'), '/home/user/a')
ae(fs.translate('a/..'), '/home/user')
ae(fs.translate('a/b'), '/home/user/a/b')
ae(fs.translate('/a/b'), '/home/user/a/b')
ae(fs.translate('/a/b/..'), '/home/user/a')
ae(fs.translate('/a/b/../..'), '/home/user')
fs.cwd = '/sub'
ae(fs.translate(''), '/home/user/sub')
ae(fs.translate('/'), '/home/user')
ae(fs.translate('.'), '/home/user/sub')
ae(fs.translate('..'), '/home/user')
ae(fs.translate('a'), '/home/user/sub/a')
ae(fs.translate('a/'), '/home/user/sub/a')
ae(fs.translate('a/..'), '/home/user/sub')
ae(fs.translate('a/b'), '/home/user/sub/a/b')
ae(fs.translate('a/b/..'), '/home/user/sub/a')
ae(fs.translate('a/b/../..'), '/home/user/sub')
ae(fs.translate('a/b/../../..'), '/home/user')
ae(fs.translate('//a'), '/home/user/a') # UNC paths must be collapsed
# the same as above but using the root directory /
fs.root = '/'
fs.cwd = '/'
ae(fs.translate(''), '/')
ae(fs.translate('/'), '/')
ae(fs.translate('.'), '/')
ae(fs.translate('..'), '/')
ae(fs.translate('a'), '/a')
ae(fs.translate('/a'), '/a')
ae(fs.translate('/a/'), '/a')
ae(fs.translate('a/..'), '/')
ae(fs.translate('a/b'), '/a/b')
ae(fs.translate('/a/b'), '/a/b')
ae(fs.translate('/a/b/..'), '/a')
ae(fs.translate('/a/b/../..'), '/')
fs.cwd = '/sub'
ae(fs.translate(''), '/sub')
ae(fs.translate('/'), '/')
ae(fs.translate('.'), '/sub')
ae(fs.translate('..'), '/')
ae(fs.translate('a'), '/sub/a')
ae(fs.translate('a/'), '/sub/a')
ae(fs.translate('a/..'), '/sub')
ae(fs.translate('a/b'), '/sub/a/b')
ae(fs.translate('a/b/..'), '/sub/a')
ae(fs.translate('a/b/../..'), '/sub')
ae(fs.translate('a/b/../../..'), '/')
ae(fs.translate('//a'), '/a') # UNC paths must be collapsed
# Windows
elif os.sep == '\\':
fs.root = r'C:\dir'
fs.cwd = '/'
ae(fs.translate(''), r'C:\dir')
ae(fs.translate('/'), r'C:\dir')
ae(fs.translate('.'), r'C:\dir')
ae(fs.translate('..'), r'C:\dir')
ae(fs.translate('a'), r'C:\dir\a')
ae(fs.translate('/a'), r'C:\dir\a')
ae(fs.translate('/a/'), r'C:\dir\a')
ae(fs.translate('a/..'), r'C:\dir')
ae(fs.translate('a/b'), r'C:\dir\a\b')
ae(fs.translate('/a/b'), r'C:\dir\a\b')
ae(fs.translate('/a/b/..'), r'C:\dir\a')
ae(fs.translate('/a/b/../..'), r'C:\dir')
fs.cwd = '/sub'
ae(fs.translate(''), r'C:\dir\sub')
ae(fs.translate('/'), r'C:\dir')
ae(fs.translate('.'), r'C:\dir\sub')
ae(fs.translate('..'), r'C:\dir')
ae(fs.translate('a'), r'C:\dir\sub\a')
ae(fs.translate('a/'), r'C:\dir\sub\a')
ae(fs.translate('a/..'), r'C:\dir\sub')
ae(fs.translate('a/b'), r'C:\dir\sub\a\b')
ae(fs.translate('a/b/..'), r'C:\dir\sub\a')
ae(fs.translate('a/b/../..'), r'C:\dir\sub')
ae(fs.translate('a/b/../../..'), r'C:\dir')
ae(fs.translate('//a'), r'C:\dir\a') # UNC paths must be collapsed
# the same as above but using the drive root C:\
fs.root = 'C:\\'
fs.cwd = '/'
ae(fs.translate(''), 'C:\\')
ae(fs.translate('/'), 'C:\\')
ae(fs.translate('.'), 'C:\\')
ae(fs.translate('..'), 'C:\\')
ae(fs.translate('a'), r'C:\a')
ae(fs.translate('/a'), r'C:\a')
ae(fs.translate('/a/'), r'C:\a')
ae(fs.translate('a/..'), 'C:\\')
ae(fs.translate('a/b'), r'C:\a\b')
ae(fs.translate('/a/b'), r'C:\a\b')
ae(fs.translate('/a/b/..'), r'C:\a')
ae(fs.translate('/a/b/../..'), 'C:\\')
fs.cwd = '/sub'
ae(fs.translate(''), r'C:\sub')
ae(fs.translate('/'), 'C:\\')
ae(fs.translate('.'), 'C:\\sub')
ae(fs.translate('..'), 'C:\\')
ae(fs.translate('a'), r'C:\sub\a')
ae(fs.translate('a/'), r'C:\sub\a')
ae(fs.translate('a/..'), r'C:\sub')
ae(fs.translate('a/b'), r'C:\sub\a\b')
ae(fs.translate('a/b/..'), r'C:\sub\a')
ae(fs.translate('a/b/../..'), r'C:\sub')
ae(fs.translate('a/b/../../..'), 'C:\\')
ae(fs.translate('//a'), r'C:\a') # UNC paths must be collapsed
# On Mac OS 8 & 9, the folder delimiter is colon (":"), and the path
# separator is a semi-colon (";"). Not enough hardware (...and money
# ;-)) for creating tests.
else:
self.fail('Test not available for such system (os.sep == "%s").'
%os.sep)
class DummyAuthorizerClass(unittest.TestCase):
# temporarily change warnings to exceptions for the purposes of testing
def setUp(self):
warnings.filterwarnings("error")
def tearDown(self):
warnings.resetwarnings()
def test_dummy_authorizer(self):
auth = ftpserver.DummyAuthorizer()
auth.user_table = {}
# create user
auth.add_user(user, pwd, home, perm=('r', 'w'))
auth.add_anonymous(home)
# check credentials
self.failUnless(auth.validate_authentication(user, pwd))
self.failIf(auth.validate_authentication(user, 'wrongpwd'))
# remove them
auth.remove_user(user)
auth.remove_user('anonymous')
# raise exc if user does not exists
self.assertRaises(KeyError, auth.remove_user, user)
# raise exc if path does not exist
self.assertRaises(ftpserver.AuthorizerError, auth.add_user, user,
pwd, '?:\\')
self.assertRaises(ftpserver.AuthorizerError, auth.add_anonymous, '?:\\')
# raise exc if user already exists
auth.add_user(user, pwd, home)
auth.add_anonymous(home)
self.assertRaises(ftpserver.AuthorizerError, auth.add_user, user,
pwd, home)
self.assertRaises(ftpserver.AuthorizerError, auth.add_anonymous, home)
auth.remove_user(user)
auth.remove_user('anonymous')
# raise on wrong permission
self.assertRaises(ftpserver.AuthorizerError, auth.add_user, user, pwd,
home, perm=('?'))
self.assertRaises(ftpserver.AuthorizerError, auth.add_anonymous, home,
perm=('?'))
# expect warning on 'w' permission assigned to anonymous user
self.assertRaises(RuntimeWarning, auth.add_anonymous, home, perm=('w'))
class FtpAuthentication(unittest.TestCase):
"test: USER, PASS, REIN"
def setUp(self):
global ftp
ftp = ftplib.FTP()
ftp.connect(host=host, port=port)
self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
self.f2 = open(tempfile.mktemp(dir=home), 'w+b')
def tearDown(self):
ftp.close()
if not self.f1.closed:
self.f1.close()
if not self.f2.closed:
self.f2.close()
os.remove(self.f1.name)
os.remove(self.f2.name)
def test_auth_ok(self):
ftp.login(user=user, passwd=pwd)
def test_auth_failed(self):
self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')
def test_anon_auth(self):
ftp.login(user='anonymous', passwd='anon@')
ftp.login(user='AnonYmoUs', passwd='anon@')
ftp.login(user='anonymous', passwd='')
def test_max_auth(self):
self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')
self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')
self.failUnlessRaises(ftplib.error_perm, ftp.login, user, passwd='wrong')
# If authentication fails for 3 times ftpd disconnect us.
# We can check if this happen by using ftp.sendcmd() on the 'dead'
# socket object. If socket object is really dead it should be raised
# socket.error exception (Windows) or EOFError exception (Linux).
self.failUnlessRaises((socket.error, EOFError), ftp.sendcmd, '')
def test_rein(self):
"""Test REIN while no transfer is in progress."""
ftp.login(user=user, passwd=pwd)
ftp.sendcmd('rein')
# user is not yet authenticated, a permission error response is expected
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
# by logging-in again we should be able to execute a file-system command
ftp.login(user=user, passwd=pwd)
ftp.sendcmd('pwd')
def test_rein_on_transfer(self):
"""Test REIN while a transfer is in progress."""
ftp.login(user=user, passwd=pwd)
data = 'abcde12345' * 100000
fname_1 = os.path.basename(self.f1.name)
self.f1.write(data)
self.f1.close()
ftp.voidcmd('TYPE I')
conn = ftp.transfercmd('retr ' + fname_1)
bytes_recv = 0
rein_sent = 0
while 1:
chunk = conn.recv(8192)
# stop transfer while it isn't finished yet
if bytes_recv >= 524288: # 2^19
if not rein_sent:
# flush account, expect an error response
ftp.sendcmd('rein')
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
rein_sent = 1
if not chunk:
break
self.f2.write(chunk)
bytes_recv += len(chunk)
# a 226 response is expected once tranfer finishes
self.assertEqual(ftp.voidresp()[:3], '226')
# account is still flushed, error response is still expected
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'size ' + fname_1)
# by logging-in again we should be able to execute a file-system command
ftp.login(user=user, passwd=pwd)
ftp.sendcmd('pwd')
self.f2.seek(0)
self.assertEqual(hash(data), hash (self.f2.read()))
def test_user(self):
"""Test USER while already authenticated and no transfer is in progress.
"""
ftp.login(user=user, passwd=pwd)
ftp.sendcmd('user ' + user)
# user is not yet authenticated, a permission error response is expected
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
# by logging-in again we should be able to execute a file-system command
ftp.sendcmd('pass ' + pwd)
ftp.sendcmd('pwd')
def test_user_on_transfer(self):
"""Test USER while already authenticated and a transfer is in progress.
"""
ftp.login(user=user, passwd=pwd)
data = 'abcde12345' * 100000
fname_1 = os.path.basename(self.f1.name)
self.f1.write(data)
self.f1.close()
ftp.voidcmd('TYPE I')
conn = ftp.transfercmd('retr ' + fname_1)
bytes_recv = 0
rein_sent = 0
while 1:
chunk = conn.recv(8192)
# stop transfer while it isn't finished yet
if bytes_recv >= 524288: # 2^19
if not rein_sent:
# flush account, expect an error response
ftp.sendcmd('user ' + user)
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
rein_sent = 1
if not chunk:
break
self.f2.write(chunk)
bytes_recv += len(chunk)
# a 226 response is expected once tranfer finishes
self.assertEqual(ftp.voidresp()[:3], '226')
# account is still flushed, error response is still expected
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'pwd')
# by logging-in again we should be able to execute a file-system command
ftp.sendcmd('pass ' + pwd)
ftp.sendcmd('pwd')
self.f2.seek(0)
self.assertEqual(hash(data), hash (self.f2.read()))
class FtpDummyCmds(unittest.TestCase):
"test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP"
def setUp(self):
global ftp
ftp = ftplib.FTP()
ftp.connect(host=host, port=port)
ftp.login(user=user, passwd=pwd)
def tearDown(self):
ftp.close()
def test_type(self):
ftp.sendcmd('type a')
ftp.sendcmd('type i')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'type')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'type ?!?')
def test_stru(self):
ftp.sendcmd('stru f')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'stru')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'stru ?!?')
def test_mode(self):
ftp.sendcmd('mode s')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'mode')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'mode ?!?')
def test_noop(self):
ftp.sendcmd('noop')
def test_syst(self):
ftp.sendcmd('syst')
def test_allo(self):
ftp.sendcmd('allo x')
def test_help(self):
ftp.sendcmd('help')
cmd = random.choice(ftpserver.proto_cmds.keys())
ftp.sendcmd('help %s' %cmd)
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'help ?!?')
def test_rest(self):
# just test rest's semantic without using data-transfer
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'rest')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'rest str')
self.failUnlessRaises(ftplib.error_perm, ftp.sendcmd, 'rest -1')
def test_quit(self):
ftp.sendcmd('quit')
class FtpFsOperations(unittest.TestCase):
"test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
def setUp(self):
global ftp
ftp = ftplib.FTP()
ftp.connect(host=host, port=port)
ftp.login(user=user, passwd=pwd)
self.tempfile = os.path.basename(open(tempfile.mktemp(dir=home), 'w+b').name)
self.tempdir = os.path.basename(tempfile.mktemp(dir=home))
os.mkdir(self.tempdir)
def tearDown(self):
ftp.close()
if os.path.exists(self.tempfile):
os.remove(self.tempfile)
if os.path.exists(self.tempdir):
os.rmdir(self.tempdir)
def test_cwd(self):
ftp.cwd(self.tempdir)
self.assertRaises(ftplib.error_perm, ftp.cwd, 'subtempdir')
def test_pwd(self):
self.assertEqual(ftp.pwd(), '/')
ftp.cwd(self.tempdir)
self.assertEqual(ftp.pwd(), '/' + self.tempdir)
def test_cdup(self):
# ftplib.parse257 function is usually used for parsing the '257'
# response for a MKD or PWD request returning the directory name
# in the 257 reply.
# Even if CDUP response code is different (250) we could use parse257
# anyway for getting directory name.
ftp.cwd(self.tempdir)
dir = ftplib.parse257(ftp.sendcmd('cdup').replace('250', '257'))
self.assertEqual(dir, '/')
dir = ftplib.parse257(ftp.sendcmd('cdup').replace('250', '257'))
self.assertEqual(dir, '/')
def test_mkd(self):
tempdir = os.path.basename(tempfile.mktemp(dir=home))
ftp.mkd(tempdir)
# make sure we can't create directories which already exist (probably
# not really necessary);
# let's use a try/except statement to avoid leaving behind orphaned
# temporary directory in the event of a test failure.
try:
ftp.mkd(tempdir)
except ftplib.error_perm, err:
os.rmdir(tempdir) # ok
else:
self.fail('ftplib.error_perm not raised.')
def test_rmd(self):
ftp.rmd(self.tempdir)
# make sure we can't use rmd against files
self.assertRaises(ftplib.error_perm, ftp.rmd, self.tempfile)
# make sure we can't remove root directory
self.assertRaises(ftplib.error_perm, ftp.rmd, '/')
def test_dele(self):
ftp.delete(self.tempfile)
# make sure we can't rename root directory, just to be safe,
# maybe not really necessary...
self.assertRaises(ftplib.error_perm, ftp.delete, self.tempdir)
def test_rnfr_rnto(self):
# rename file
tempname = os.path.basename(tempfile.mktemp(dir=home))
ftp.rename(self.tempfile, tempname)
ftp.rename(tempname, self.tempfile)
# rename dir
tempname = os.path.basename(tempfile.mktemp(dir=home))
ftp.rename(self.tempdir, tempname)
ftp.rename(tempname, self.tempdir)
# rnfr/rnto over non-existing paths
bogus = os.path.basename(tempfile.mktemp(dir=home))
self.assertRaises(ftplib.error_perm, ftp.rename, bogus, '/x')
self.assertRaises(ftplib.error_perm, ftp.rename, self.tempfile, '/')
# make sure we can't rename root directory, just to be safe,
# maybe not really necessary...
self.assertRaises(ftplib.error_perm, ftp.rename, '/', '/x')
def test_mdtm(self):
ftp.sendcmd('mdtm ' + self.tempfile)
# make sure we can't use mdtm against directories
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'mdtm ' + self.tempdir)
def test_size(self):
ftp.size(self.tempfile)
# make sure we can't use size against directories
self.assertRaises(ftplib.error_perm, ftp.size, self.tempdir)
def test_stat(self):
ftp.sendcmd('stat')
ftp.sendcmd('stat *')
ftp.sendcmd('stat ' + self.tempfile)
ftp.sendcmd('stat ' + self.tempdir)
self.failUnless('Directory is empty' in ftp.sendcmd('stat '+ self.tempdir))
self.failUnless('recursion not supported' in ftp.sendcmd('stat /*/*'))
class FtpRetrieveData(unittest.TestCase):
"test: RETR, REST, LIST, NLST"
def setUp(self):
global ftp
ftp = ftplib.FTP()
ftp.connect(host=host, port=port)
ftp.login(user=user, passwd=pwd)
self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
self.f2 = open(tempfile.mktemp(dir=home), 'w+b')
def tearDown(self):
ftp.close()
if not self.f1.closed:
self.f1.close()
if not self.f2.closed:
self.f2.close()
os.remove(self.f1.name)
os.remove(self.f2.name)
def test_retr(self):
data = 'abcde12345' * 100000
self.f1.write(data)
self.f1.close()
remote_fname = os.path.basename(self.f1.name)
ftp.retrbinary("retr " + remote_fname, self.f2.write)
self.f2.seek(0)
self.assertEqual(hash(data), hash(self.f2.read()))
def test_restore_on_retr(self):
data = 'abcde12345' * 100000
fname_1 = os.path.basename(self.f1.name)
fname_2 = os.path.basename(self.f2.name)
self.f1.write(data)
self.f1.close()
# look at ftplib.FTP.retrbinary method to understand this mess
ftp.voidcmd('TYPE I')
conn = ftp.transfercmd('retr ' + fname_1)
bytes_recv = 0
while 1:
chunk = conn.recv(8192)
# stop transfer while it isn't finished yet
if bytes_recv >= 524288: # 2^19
break
elif not chunk:
break
self.f2.write(chunk)
bytes_recv += len(chunk)
conn.close()
# transfer wasn't finished yet so we expect a 426 response
self.failUnlessRaises(ftplib.error_temp, ftp.voidresp)
# resuming transfer by using a marker value greater than the file
# size stored on the server should result in an error on retr
file_size = ftp.size(fname_1)
ftp.sendcmd('rest %s' %((file_size + 1)))
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'retr ' + fname_1)
# test resume
ftp.sendcmd('rest %s' %bytes_recv)
ftp.retrbinary("retr " + fname_1, self.f2.write)
self.f2.seek(0)
self.assertEqual(hash(data), hash (self.f2.read()))
def test_list(self):
l = []
ftp.retrlines('LIST ' + os.path.basename(self.f1.name), l.append)
self.assertEqual(len(l), 1)
l = []
l1, l2, l3, l4 = [], [], [], []
ftp.retrlines('LIST', l.append)
ftp.retrlines('LIST -a', l1.append)
ftp.retrlines('LIST -l', l2.append)
ftp.retrlines('LIST -al', l3.append)
ftp.retrlines('LIST -la', l4.append)
x = [l, l1, l2, l3, l4]
for i in range(0,4):
self.assertEqual(x[i], x[i+1])
def test_nlst(self):
l = []
ftp.retrlines('NLST', l.append)
fname = os.path.basename(self.f1.name)
self.assertRaises(ftplib.error_perm, ftp.retrlines, 'NLST ' + fname, l.append)
class FtpAbort(unittest.TestCase):
"test: ABOR"
def setUp(self):
global ftp
ftp = ftplib.FTP()
ftp.connect(host=host, port=port)
ftp.login(user=user, passwd=pwd)
self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
self.f2 = open(tempfile.mktemp(dir=home), 'w+b')
def tearDown(self):
ftp.close()
if not self.f1.closed:
self.f1.close()
if not self.f2.closed:
self.f2.close()
os.remove(self.f1.name)
os.remove(self.f2.name)
def test_abor_no_data(self):
# Case 1: ABOR while no data channel is opened: respond with 225.
resp = ftp.sendcmd('ABOR')
self.failUnlessEqual('225 No transfer to abort.', resp)
def test_abor_pasv(self):
# Case 2: user sends a PASV, a data-channel socket is listening but not
# connected, and ABOR is sent: close listening data socket, respond
# with 225.
ftp.sendcmd('PASV')
respcode = ftp.sendcmd('ABOR')[:3]
self.failUnlessEqual('225', respcode)
def test_abor_port(self):
# Case 3: data channel opened with PASV or PORT, but ABOR sent before
# a data transfer has been started: close data channel, respond with 225
ftp.makeport()
respcode = ftp.sendcmd('ABOR')[:3]
self.failUnlessEqual('225', respcode)
def test_abor(self):
# Case 4: ABOR while a data transfer on DTP channel is in progress:
# close data channel, respond with 426, respond with 226.
data = 'abcde12345' * 100000
self.f1.write(data)
self.f1.close()
# this ugly loop construct is to simulate an interrupted transfer since
# ftplib doesn't like running storbinary() in a separate thread
ftp.voidcmd('TYPE I')
conn = ftp.transfercmd('retr ' + os.path.basename(self.f1.name))
bytes_recv = 0
while 1:
chunk = conn.recv(8192)
# stop transfer while it isn't finished yet
if bytes_recv >= 524288: # 2^19
break
elif not chunk:
break
self.f2.write(chunk)
bytes_recv += len(chunk)
ftp.putcmd('ABOR')
# transfer isn't finished yet so ftpd should respond with 426
self.failUnlessRaises(ftplib.error_temp, ftp.voidresp)
# transfer successfully aborted, so should now respond with a 226
self.failUnlessEqual('226', ftp.voidresp()[:3])
class FtpStoreData(unittest.TestCase):
"test: STOR, STOU, APPE, REST"
def setUp(self):
global ftp
ftp = ftplib.FTP()
ftp.connect(host=host, port=port)
ftp.login(user=user, passwd=pwd)
self.f1 = open(tempfile.mktemp(dir=home), 'w+b')
self.f2 = open(tempfile.mktemp(dir=home), 'w+b')
def tearDown(self):
ftp.close()
if not self.f1.closed:
self.f1.close()
if not self.f2.closed:
self.f2.close()
os.remove(self.f1.name)
os.remove(self.f2.name)
def test_stor(self):
data = 'abcde12345' * 100000
self.f1.write(data)
self.f1.seek(0)
remote_fname = os.path.basename(tempfile.mktemp(dir=home))
ftp.storbinary('stor ' + remote_fname, self.f1)
ftp.retrbinary('retr ' + remote_fname, self.f2.write)
self.f2.seek(0)
self.assertEqual(hash(data), hash (self.f2.read()))
# we do not use os.remove because file could be still locked by ftpd thread
ftp.delete(remote_fname)
def test_stou(self):
data = 'abcde12345' * 100000
self.f1.write(data)
self.f1.seek(0)
ftp.voidcmd('TYPE I')
# filename comes in as 1xx FILE: <filename>
filename = ftp.sendcmd('stou').split('FILE: ')[1]
sock = ftp.makeport()
conn, sockaddr = sock.accept()
while 1:
buf = self.f1.read(8192)
if not buf:
break
conn.sendall(buf)
conn.close()
# transfer finished, a 226 response is expected
ftp.voidresp()
ftp.retrbinary('retr ' + filename, self.f2.write)
self.f2.seek(0)
self.assertEqual(hash(data), hash (self.f2.read()))
# we do not use os.remove because file could be
# still locked by ftpd thread
ftp.delete(filename)
def test_stou_rest(self):
# watch for STOU preceded by REST, which makes no sense.
ftp.sendcmd('rest 10')
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'stou')
def test_appe(self):
fname_1 = os.path.basename(self.f1.name)
fname_2 = os.path.basename(self.f2.name)
remote_fname = os.path.basename(tempfile.mktemp(dir=home))
data1 = 'abcde12345' * 100000
self.f1.write(data1)
self.f1.seek(0)
ftp.storbinary('stor ' + remote_fname, self.f1)
data2 = 'fghil67890' * 100000
self.f1.write(data2)
self.f1.seek(ftp.size(remote_fname))
ftp.storbinary('appe ' + remote_fname, self.f1)
ftp.retrbinary("retr " + remote_fname, self.f2.write)
self.f2.seek(0)
self.assertEqual(hash(data1 + data2), hash (self.f2.read()))
ftp.delete(remote_fname)
def test_appe_rest(self):
# watch for APPE preceded by REST, which makes no sense.
ftp.sendcmd('rest 10')
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'appe x')
def test_rest_on_stor(self):
fname_1 = os.path.basename(self.f1.name)
fname_2 = os.path.basename(self.f2.name)
remote_fname = os.path.basename(tempfile.mktemp(dir=home))
data = 'abcde12345' * 100000
self.f1.write(data)
self.f1.seek(0)
ftp.voidcmd('TYPE I')
conn = ftp.transfercmd('stor ' + remote_fname)
bytes_sent = 0
while 1:
chunk = self.f1.read(8192)
conn.sendall(chunk)
bytes_sent += len(chunk)
# stop transfer while it isn't finished yet
if bytes_sent >= 524288: # 2^19
break
elif not chunk:
break
conn.close()
# transfer wasn't finished yet so we expect a 426 response
ftp.voidresp()
# resuming transfer by using a marker value greater than the file
# size stored on the server should result in an error on stor
file_size = ftp.size(remote_fname)
self.assertEqual(file_size, bytes_sent)
ftp.sendcmd('rest %s' %((file_size + 1)))
self.assertRaises(ftplib.error_perm, ftp.sendcmd, 'stor ' + remote_fname)
ftp.sendcmd('rest %s' %bytes_sent)
ftp.storbinary('stor ' + remote_fname, self.f1)
ftp.retrbinary('retr ' + remote_fname, self.f2.write)
self.f1.seek(0)
self.f2.seek(0)
self.assertEqual(hash(self.f1.read()), hash(self.f2.read()))
ftp.delete(remote_fname)
def run():
class ftpd(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
def devnull(msg):
pass
ftpserver.log = devnull
ftpserver.logline = devnull
ftpserver.debug = devnull
authorizer = ftpserver.DummyAuthorizer()
authorizer.add_user(user, pwd, home, perm=('r', 'w'))
authorizer.add_anonymous(home)
ftp_handler = ftpserver.FTPHandler
ftp_handler.authorizer = authorizer
address = (host, port)
ftpd = ftpserver.FTPServer(address, ftp_handler)
ftpd.serve_forever()
def exit_fun():
os._exit(0)
atexit.register(exit_fun)
f = ftpd()
f.start()
time.sleep(0.3)
unittest.main()
host = '127.0.0.1'
port = 54321
user = 'user'
pwd = '12345'
home = os.getcwd()
if __name__ == '__main__':
run()
syntax highlighted by Code2HTML, v. 0.9.1