# -*- test-case-name: twisted.names.test.test_names -*- # Copyright (c) 2001-2004 Twisted Matrix Laboratories. # See LICENSE for details. """ Test cases for twisted.names. """ from __future__ import nested_scopes import socket, operator, copy from twisted.trial import unittest from twisted.internet import reactor, defer, error from twisted.internet.defer import succeed from twisted.names import client, server, common, authority, hosts, dns from twisted.python import failure from twisted.names.error import DNSFormatError, DNSServerError, DNSNameError from twisted.names.error import DNSNotImplementedError, DNSQueryRefusedError from twisted.names.error import DNSUnknownError from twisted.names.dns import EFORMAT, ESERVER, ENAME, ENOTIMP, EREFUSED from twisted.names.dns import Message from twisted.names.client import Resolver def justPayload(results): return [r.payload for r in results[0]] class NoFileAuthority(authority.FileAuthority): def __init__(self, soa, records): # Yes, skip FileAuthority common.ResolverBase.__init__(self) self.soa, self.records = soa, records soa_record = dns.Record_SOA( mname = 'test-domain.com', rname = 'root.test-domain.com', serial = 100, refresh = 1234, minimum = 7654, expire = 19283784, retry = 15, ttl=1 ) reverse_soa = dns.Record_SOA( mname = '93.84.28.in-addr.arpa', rname = '93.84.28.in-addr.arpa', serial = 120, refresh = 54321, minimum = 382, expire = 11193983, retry = 30, ttl=3 ) my_soa = dns.Record_SOA( mname = 'my-domain.com', rname = 'postmaster.test-domain.com', serial = 130, refresh = 12345, minimum = 1, expire = 999999, retry = 100, ) test_domain_com = NoFileAuthority( soa = ('test-domain.com', soa_record), records = { 'test-domain.com': [ soa_record, dns.Record_A('127.0.0.1'), dns.Record_NS('39.28.189.39'), dns.Record_MX(10, 'host.test-domain.com'), dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'), dns.Record_CNAME('canonical.name.com'), dns.Record_MB('mailbox.test-domain.com'), dns.Record_MG('mail.group.someplace'), dns.Record_TXT('A First piece of Text', 'a SecoNd piece'), dns.Record_A6(0, 'ABCD::4321', ''), dns.Record_A6(12, '0:0069::0', 'some.network.tld'), dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net'), dns.Record_TXT('Some more text, haha! Yes. \0 Still here?'), dns.Record_MR('mail.redirect.or.whatever'), dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box'), dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com'), dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text'), dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01'), dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF')], 'http.tcp.test-domain.com': [ dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool') ], 'host.test-domain.com': [ dns.Record_A('123.242.1.5'), dns.Record_A('0.255.0.255'), ], 'host-two.test-domain.com': [ # # Python bug # dns.Record_A('255.255.255.255'), # dns.Record_A('255.255.255.254'), dns.Record_A('0.0.0.0') ], 'cname.test-domain.com': [ dns.Record_CNAME('test-domain.com') ], 'anothertest-domain.com': [ dns.Record_A('1.2.3.4')], } ) reverse_domain = NoFileAuthority( soa = ('93.84.28.in-addr.arpa', reverse_soa), records = { '123.93.84.28.in-addr.arpa': [ dns.Record_PTR('test.host-reverse.lookup.com'), reverse_soa ] } ) my_domain_com = NoFileAuthority( soa = ('my-domain.com', my_soa), records = { 'my-domain.com': [ my_soa, dns.Record_A('1.2.3.4', ttl='1S'), dns.Record_NS('ns1.domain', ttl='2M'), dns.Record_NS('ns2.domain', ttl='3H'), dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D') ] } ) class ServerDNSTestCase(unittest.TestCase): """Test cases for DNS server and client.""" def setUp(self): self.factory = server.DNSServerFactory([ test_domain_com, reverse_domain, my_domain_com ], verbose=2) p = dns.DNSDatagramProtocol(self.factory) while 1: self.listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1") port = self.listenerTCP.getHost().port try: self.listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1") except error.CannotListenError: self.listenerTCP.stopListening() else: break self.resolver = client.Resolver(servers=[('127.0.0.1', port)]) def tearDown(self): """Asynchronously disconnect listenerTCP, listenerUDP and resolver""" d1 = self.listenerTCP.loseConnection() d2 = defer.maybeDeferred(self.listenerUDP.stopListening) d = defer.gatherResults([d1, d2]) def disconnectTransport(ignored): if getattr(self.resolver.protocol, 'transport', None) is not None: return self.resolver.protocol.transport.stopListening() d.addCallback(disconnectTransport) d.addCallback(lambda x : self.failUnless( self.listenerUDP.disconnected and self.listenerTCP.disconnected)) return d def namesTest(self, d, r): self.response = None def setDone(response): self.response = response def checkResults(ignored): if isinstance(self.response, failure.Failure): raise self.response results = justPayload(self.response) assert len(results) == len(r), "%s != %s" % (map(str, results), map(str, r)) for rec in results: assert rec in r, "%s not in %s" % (rec, map(str, r)) d.addBoth(setDone) d.addCallback(checkResults) return d def testAddressRecord1(self): """Test simple DNS 'A' record queries""" return self.namesTest( self.resolver.lookupAddress('test-domain.com'), [dns.Record_A('127.0.0.1', ttl=19283784)] ) def testAddressRecord2(self): """Test DNS 'A' record queries with multiple answers""" return self.namesTest( self.resolver.lookupAddress('host.test-domain.com'), [dns.Record_A('123.242.1.5', ttl=19283784), dns.Record_A('0.255.0.255', ttl=19283784)] ) def testAdressRecord3(self): """Test DNS 'A' record queries with edge cases""" return self.namesTest( self.resolver.lookupAddress('host-two.test-domain.com'), [dns.Record_A('255.255.255.254', ttl=19283784), dns.Record_A('0.0.0.0', ttl=19283784)] ) def testAuthority(self): """Test DNS 'SOA' record queries""" return self.namesTest( self.resolver.lookupAuthority('test-domain.com'), [soa_record] ) def testMailExchangeRecord(self): """Test DNS 'MX' record queries""" return self.namesTest( self.resolver.lookupMailExchange('test-domain.com'), [dns.Record_MX(10, 'host.test-domain.com', ttl=19283784)] ) def testNameserver(self): """Test DNS 'NS' record queries""" return self.namesTest( self.resolver.lookupNameservers('test-domain.com'), [dns.Record_NS('39.28.189.39', ttl=19283784)] ) def testHINFO(self): """Test DNS 'HINFO' record queries""" return self.namesTest( self.resolver.lookupHostInfo('test-domain.com'), [dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know', ttl=19283784)] ) def testPTR(self): """Test DNS 'PTR' record queries""" return self.namesTest( self.resolver.lookupPointer('123.93.84.28.in-addr.arpa'), [dns.Record_PTR('test.host-reverse.lookup.com', ttl=11193983)] ) def testCNAME(self): """Test DNS 'CNAME' record queries""" return self.namesTest( self.resolver.lookupCanonicalName('test-domain.com'), [dns.Record_CNAME('canonical.name.com', ttl=19283784)] ) def testCNAMEAdditional(self): """Test additional processing for CNAME records""" return self.namesTest( self.resolver.lookupAddress('cname.test-domain.com'), [dns.Record_CNAME('test-domain.com', ttl=19283784), dns.Record_A('127.0.0.1', ttl=19283784)] ) def testMB(self): """Test DNS 'MB' record queries""" return self.namesTest( self.resolver.lookupMailBox('test-domain.com'), [dns.Record_MB('mailbox.test-domain.com', ttl=19283784)] ) def testMG(self): """Test DNS 'MG' record queries""" return self.namesTest( self.resolver.lookupMailGroup('test-domain.com'), [dns.Record_MG('mail.group.someplace', ttl=19283784)] ) def testMR(self): """Test DNS 'MR' record queries""" return self.namesTest( self.resolver.lookupMailRename('test-domain.com'), [dns.Record_MR('mail.redirect.or.whatever', ttl=19283784)] ) def testMINFO(self): """Test DNS 'MINFO' record queries""" return self.namesTest( self.resolver.lookupMailboxInfo('test-domain.com'), [dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box', ttl=19283784)] ) def testSRV(self): """Test DNS 'SRV' record queries""" return self.namesTest( self.resolver.lookupService('http.tcp.test-domain.com'), [dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl=19283784)] ) def testAFSDB(self): """Test DNS 'AFSDB' record queries""" return self.namesTest( self.resolver.lookupAFSDatabase('test-domain.com'), [dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com', ttl=19283784)] ) def testRP(self): """Test DNS 'RP' record queries""" return self.namesTest( self.resolver.lookupResponsibility('test-domain.com'), [dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text', ttl=19283784)] ) def testTXT(self): """Test DNS 'TXT' record queries""" return self.namesTest( self.resolver.lookupText('test-domain.com'), [dns.Record_TXT('A First piece of Text', 'a SecoNd piece', ttl=19283784), dns.Record_TXT('Some more text, haha! Yes. \0 Still here?', ttl=19283784)] ) def testWKS(self): """Test DNS 'WKS' record queries""" return self.namesTest( self.resolver.lookupWellKnownServices('test-domain.com'), [dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01', ttl=19283784)] ) def testSomeRecordsWithTTLs(self): result_soa = copy.copy(my_soa) result_soa.ttl = my_soa.expire return self.namesTest( self.resolver.lookupAllRecords('my-domain.com'), [result_soa, dns.Record_A('1.2.3.4', ttl='1S'), dns.Record_NS('ns1.domain', ttl='2M'), dns.Record_NS('ns2.domain', ttl='3H'), dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')] ) def testAAAA(self): """Test DNS 'AAAA' record queries (IPv6)""" return self.namesTest( self.resolver.lookupIPV6Address('test-domain.com'), [dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF', ttl=19283784)] ) def testA6(self): """Test DNS 'A6' record queries (IPv6)""" return self.namesTest( self.resolver.lookupAddress6('test-domain.com'), [dns.Record_A6(0, 'ABCD::4321', '', ttl=19283784), dns.Record_A6(12, '0:0069::0', 'some.network.tld', ttl=19283784), dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net', ttl=19283784)] ) def testZoneTransfer(self): """Test DNS 'AXFR' queries (Zone transfer)""" default_ttl = soa_record.expire results = [copy.copy(r) for r in reduce(operator.add, test_domain_com.records.values())] for r in results: if r.ttl is None: r.ttl = default_ttl return self.namesTest( self.resolver.lookupZone('test-domain.com').addCallback(lambda r: (r[0][:-1],)), results ) def testSimilarZonesDontInterfere(self): """Tests that unrelated zones don't mess with each other.""" return self.namesTest( self.resolver.lookupAddress("anothertest-domain.com"), [dns.Record_A('1.2.3.4', ttl=19283784)] ) class HelperTestCase(unittest.TestCase): def testSerialGenerator(self): f = self.mktemp() a = authority.getSerial(f) for i in range(20): b = authority.getSerial(f) self.failUnless(a < b) a = b class AXFRTest(unittest.TestCase): def setUp(self): self.results = None self.d = defer.Deferred() self.d.addCallback(self._gotResults) self.controller = client.AXFRController('fooby.com', self.d) self.soa = dns.RRHeader(name='fooby.com', type=dns.SOA, cls=dns.IN, ttl=86400, auth=False, payload=dns.Record_SOA(mname='fooby.com', rname='hooj.fooby.com', serial=100, refresh=200, retry=300, expire=400, minimum=500, ttl=600)) self.records = [ self.soa, dns.RRHeader(name='fooby.com', type=dns.NS, cls=dns.IN, ttl=700, auth=False, payload=dns.Record_NS(name='ns.twistedmatrix.com', ttl=700)), dns.RRHeader(name='fooby.com', type=dns.MX, cls=dns.IN, ttl=700, auth=False, payload=dns.Record_MX(preference=10, exchange='mail.mv3d.com', ttl=700)), dns.RRHeader(name='fooby.com', type=dns.A, cls=dns.IN, ttl=700, auth=False, payload=dns.Record_A(address='64.123.27.105', ttl=700)), self.soa ] def _makeMessage(self): # hooray they all have the same message format return dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1, rCode=0, trunc=0, maxSize=0) def testBindAndTNamesStyle(self): # Bind style = One big single message m = self._makeMessage() m.queries = [dns.Query('fooby.com', dns.AXFR, dns.IN)] m.answers = self.records self.controller.messageReceived(m, None) self.assertEquals(self.results, self.records) def _gotResults(self, result): self.results = result def testDJBStyle(self): # DJB style = message per record records = self.records[:] while records: m = self._makeMessage() m.queries = [] # DJB *doesn't* specify any queries.. hmm.. m.answers = [records.pop(0)] self.controller.messageReceived(m, None) self.assertEquals(self.results, self.records) class HostsTestCase(unittest.TestCase): def setUp(self): f = open('EtcHosts', 'w') f.write(''' 1.1.1.1 EXAMPLE EXAMPLE.EXAMPLETHING 1.1.1.2 HOOJY ::1 ip6thingy ''') f.close() self.resolver = hosts.Resolver('EtcHosts') def testGetHostByName(self): data = [('EXAMPLE', '1.1.1.1'), ('EXAMPLE.EXAMPLETHING', '1.1.1.1'), ('HOOJY', '1.1.1.2'), ] ds = [self.resolver.getHostByName(n).addCallback(self.assertEqual, ip) for n, ip in data] return defer.gatherResults(ds) def testLookupAddress(self): d = self.resolver.lookupAddress('HOOJY') d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(), '1.1.1.2')) return d def testIPv6(self): d = self.resolver.lookupIPV6Address('ip6thingy') d.addCallback(self.assertEqual, '::1') return d testIPv6.skip = 'IPv6 support is not in our hosts resolver yet' def testNotImplemented(self): return self.assertFailure(self.resolver.lookupMailExchange('EXAMPLE'), NotImplementedError) def testQuery(self): d = self.resolver.query(dns.Query('EXAMPLE')) d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(), '1.1.1.1')) return d def testNotFound(self): return self.assertFailure(self.resolver.lookupAddress('foueoa'), dns.DomainError) class FakeDNSDatagramProtocol(object): transport = object() def __init__(self): self.queries = [] def query(self, address, queries, timeout=10, id=None): self.queries.append((address, queries, timeout, id)) return defer.fail(dns.DNSQueryTimeoutError(queries)) def removeResend(self, id): # Ignore this for the time being. pass class RetryLogic(unittest.TestCase): testServers = [ '1.2.3.4', '4.3.2.1', 'a.b.c.d', 'z.y.x.w'] def testRoundRobinBackoff(self): addrs = [(x, 53) for x in self.testServers] r = client.Resolver(resolv=None, servers=addrs) r.protocol = proto = FakeDNSDatagramProtocol() return r.lookupAddress("foo.example.com" ).addCallback(self._cbRoundRobinBackoff ).addErrback(self._ebRoundRobinBackoff, proto ) def _cbRoundRobinBackoff(self, result): raise unittest.FailTest("Lookup address succeeded, should have timed out") def _ebRoundRobinBackoff(self, failure, fakeProto): failure.trap(defer.TimeoutError) # Assert that each server is tried with a particular timeout # before the timeout is increased and the attempts are repeated. for t in (1, 3, 11, 45): tries = fakeProto.queries[:len(self.testServers)] del fakeProto.queries[:len(self.testServers)] tries.sort() expected = list(self.testServers) expected.sort() for ((addr, query, timeout, id), expectedAddr) in zip(tries, expected): self.assertEquals(addr, (expectedAddr, 53)) self.assertEquals(timeout, t) self.failIf(fakeProto.queries) class ResolvConfHandling(unittest.TestCase): def testMissing(self): resolvConf = self.mktemp() r = client.Resolver(resolv=resolvConf) self.assertEquals(r.dynServers, [('127.0.0.1', 53)]) r._parseCall.cancel() def testEmpty(self): resolvConf = self.mktemp() fObj = file(resolvConf, 'w') fObj.close() r = client.Resolver(resolv=resolvConf) self.assertEquals(r.dynServers, [('127.0.0.1', 53)]) r._parseCall.cancel() class FilterAnswersTests(unittest.TestCase): """ Test L{twisted.names.client.Resolver.filterAnswers}'s handling of various error conditions it might encounter. """ def setUp(self): # Create a resolver pointed at an invalid server - we won't be hitting # the network in any of these tests. self.resolver = Resolver(servers=[('0.0.0.0', 0)]) def test_truncatedMessage(self): """ Test that a truncated message results in an equivalent request made via TCP. """ m = Message(trunc=True) m.addQuery('example.com') def queryTCP(queries): self.assertEqual(queries, m.queries) response = Message() response.answers = ['answer'] response.authority = ['authority'] response.additional = ['additional'] return succeed(response) self.resolver.queryTCP = queryTCP d = self.resolver.filterAnswers(m) d.addCallback( self.assertEqual, (['answer'], ['authority'], ['additional'])) return d def _rcodeTest(self, rcode, exc): m = Message(rCode=rcode) err = self.resolver.filterAnswers(m) err.trap(exc) def test_formatError(self): """ Test that a message with a result code of C{EFORMAT} results in a failure wrapped around L{DNSFormatError}. """ return self._rcodeTest(EFORMAT, DNSFormatError) def test_serverError(self): """ Like L{test_formatError} but for C{ESERVER}/L{DNSServerError}. """ return self._rcodeTest(ESERVER, DNSServerError) def test_nameError(self): """ Like L{test_formatError} but for C{ENAME}/L{DNSNameError}. """ return self._rcodeTest(ENAME, DNSNameError) def test_notImplementedError(self): """ Like L{test_formatError} but for C{ENOTIMP}/L{DNSNotImplementedError}. """ return self._rcodeTest(ENOTIMP, DNSNotImplementedError) def test_refusedError(self): """ Like L{test_formatError} but for C{EREFUSED}/L{DNSQueryRefusedError}. """ return self._rcodeTest(EREFUSED, DNSQueryRefusedError) def test_refusedError(self): """ Like L{test_formatError} but for an unrecognized error code and L{DNSUnknownError}. """ return self._rcodeTest(EREFUSED + 1, DNSUnknownError)