# -*- test-case-name: openid.test.consumer -*-
import urlparse
import cgi
import time
from openid import cryptutil, dh, oidutil, kvform
from openid.consumer.discover import OpenIDServiceEndpoint
from openid.consumer.consumer import \
AuthRequest, GenericConsumer, SUCCESS, FAILURE, CANCEL, SETUP_NEEDED, \
SuccessResponse, FailureResponse, SetupNeededResponse, CancelResponse, \
DiffieHellmanConsumerSession, Consumer
from openid import association
from openid.server.server import \
PlainTextServerSession, DiffieHellmanServerSession
from yadis.manager import Discovery
from openid.consumer import parse
from urljr.fetchers import HTTPResponse, HTTPFetchingError
from urljr import fetchers
import _memstore
assocs = [
('another 20-byte key.', 'Snarky'),
('\x00' * 20, 'Zeros'),
]
def parseQuery(qs):
q = {}
for (k, v) in cgi.parse_qsl(qs):
assert not q.has_key(k)
q[k] = v
return q
def associate(qs, assoc_secret, assoc_handle):
"""Do the server's half of the associate call, using the given
secret and handle."""
q = parseQuery(qs)
assert q['openid.mode'] == 'associate'
assert q['openid.assoc_type'] == 'HMAC-SHA1'
reply_dict = {
'assoc_type':'HMAC-SHA1',
'assoc_handle':assoc_handle,
'expires_in':'600',
}
if q.get('openid.session_type') == 'DH-SHA1':
assert len(q) == 6 or len(q) == 4
session = DiffieHellmanServerSession.fromQuery(q)
reply_dict['session_type'] = 'DH-SHA1'
else:
assert len(q) == 2
session = PlainTextServerSession.fromQuery(q)
reply_dict.update(session.answer(assoc_secret))
return kvform.dictToKV(reply_dict)
class TestFetcher(object):
def __init__(self, user_url, user_page, (assoc_secret, assoc_handle)):
self.get_responses = {user_url:self.response(user_url, 200, user_page)}
self.assoc_secret = assoc_secret
self.assoc_handle = assoc_handle
self.num_assocs = 0
def response(self, url, status, body):
return HTTPResponse(
final_url=url, status=status, headers={}, body=body)
def fetch(self, url, body=None, headers=None):
if body is None:
if url in self.get_responses:
return self.get_responses[url]
else:
try:
body.index('openid.mode=associate')
except ValueError:
pass # fall through
else:
if urlparse.urlparse(url)[0] == 'https':
# Should not be doing DH-SHA1 when using HTTPS.
assert body.find('DH-SHA1') == -1
else:
assert body.find('DH-SHA1') != -1
response = associate(
body, self.assoc_secret, self.assoc_handle)
self.num_assocs += 1
return self.response(url, 200, response)
return self.response(url, 404, 'Not found')
def _test_success(server_url, user_url, delegate_url, links, immediate=False):
store = _memstore.MemoryStore()
if immediate:
mode = 'checkid_immediate'
else:
mode = 'checkid_setup'
endpoint = OpenIDServiceEndpoint()
endpoint.identity_url = user_url
endpoint.server_url = server_url
endpoint.delegate = delegate_url
fetcher = TestFetcher(None, None, assocs[0])
fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
def run():
trust_root = consumer_url
consumer = GenericConsumer(store)
request = consumer.begin(endpoint)
return_to = consumer_url
redirect_url = request.redirectURL(trust_root, return_to, immediate)
parsed = urlparse.urlparse(redirect_url)
qs = parsed[4]
q = parseQuery(qs)
new_return_to = q['openid.return_to']
del q['openid.return_to']
assert q == {
'openid.mode':mode,
'openid.identity':delegate_url,
'openid.trust_root':trust_root,
'openid.assoc_handle':fetcher.assoc_handle,
}, (q, user_url, delegate_url, mode)
assert new_return_to.startswith(return_to)
assert redirect_url.startswith(server_url)
query = {
'nonce':request.return_to_args['nonce'],
'openid.mode':'id_res',
'openid.return_to':new_return_to,
'openid.identity':delegate_url,
'openid.assoc_handle':fetcher.assoc_handle,
}
assoc = store.getAssociation(server_url, fetcher.assoc_handle)
assoc.addSignature(['mode', 'return_to', 'identity'], query)
info = consumer.complete(query, request.endpoint)
assert info.status == SUCCESS, info.message
assert info.identity_url == user_url
assert fetcher.num_assocs == 0
run()
assert fetcher.num_assocs == 1
# Test that doing it again uses the existing association
run()
assert fetcher.num_assocs == 1
# Another association is created if we remove the existing one
store.removeAssociation(server_url, fetcher.assoc_handle)
run()
assert fetcher.num_assocs == 2
# Test that doing it again uses the existing association
run()
assert fetcher.num_assocs == 2
import unittest
http_server_url = 'http://server.example.com/'
consumer_url = 'http://consumer.example.com/'
https_server_url = 'https://server.example.com/'
class TestSuccess(unittest.TestCase):
server_url = http_server_url
user_url = 'http://www.example.com/user.html'
delegate_url = 'http://consumer.example.com/user'
def setUp(self):
self.links = '' % (
self.server_url,)
self.delegate_links = (''
'') % (
self.server_url, self.delegate_url)
def test_nodelegate(self):
_test_success(self.server_url, self.user_url,
self.user_url, self.links)
def test_nodelegateImmediate(self):
_test_success(self.server_url, self.user_url,
self.user_url, self.links, True)
def test_delegate(self):
_test_success(self.server_url, self.user_url,
self.delegate_url, self.delegate_links)
def test_delegateImmediate(self):
_test_success(self.server_url, self.user_url,
self.delegate_url, self.delegate_links, True)
class TestSuccessHTTPS(TestSuccess):
server_url = https_server_url
class TestConstruct(unittest.TestCase):
def setUp(self):
self.store_sentinel = object()
def test_construct(self):
oidc = GenericConsumer(self.store_sentinel)
self.failUnless(oidc.store is self.store_sentinel)
def test_nostore(self):
self.failUnlessRaises(TypeError, GenericConsumer)
class TestIdRes(unittest.TestCase):
consumer_class = GenericConsumer
def setUp(self):
self.store = _memstore.MemoryStore()
self.consumer = self.consumer_class(self.store)
self.return_to = "nonny"
self.endpoint = OpenIDServiceEndpoint()
self.endpoint.identity_url = self.consumer_id = "consu"
self.endpoint.server_url = self.server_url = "serlie"
self.endpoint.delegate = self.server_id = "sirod"
class TestQueryFormat(TestIdRes):
def test_notAList(self):
# Value should be a single string. If it's a list, it should generate
# an exception.
query = {'openid.mode': ['cancel']}
try:
r = self.consumer.complete(query, None)
except TypeError, err:
self.failUnless(str(err).find('values') != -1, err)
else:
self.fail("expected TypeError, got this instead: %s" % (r,))
class TestComplete(TestIdRes):
def test_cancel(self):
query = {'openid.mode': 'cancel'}
r = self.consumer.complete(query, self.endpoint)
self.failUnlessEqual(r.status, CANCEL)
self.failUnless(r.identity_url == self.endpoint.identity_url)
def test_error(self):
msg = 'an error message'
query = {'openid.mode': 'error',
'openid.error': msg,
}
r = self.consumer.complete(query, self.endpoint)
self.failUnlessEqual(r.status, FAILURE)
self.failUnless(r.identity_url == self.endpoint.identity_url)
self.failUnlessEqual(r.message, msg)
def test_noMode(self):
query = {}
r = self.consumer.complete(query, self.endpoint)
self.failUnlessEqual(r.status, FAILURE)
self.failUnless(r.identity_url == self.endpoint.identity_url)
def test_idResMissingField(self):
query = {'openid.mode': 'id_res'}
r = self.consumer.complete(query, self.endpoint)
self.failUnlessEqual(r.status, FAILURE)
self.failUnlessEqual(r.identity_url, self.consumer_id)
def test_idResURLMismatch(self):
query = {'openid.mode': 'id_res',
'openid.return_to': 'return_to (just anything)',
'openid.identity': 'something wrong (not self.consumer_id)',
'openid.assoc_handle': 'does not matter',
}
r = self.consumer.complete(query, self.endpoint)
self.failUnlessEqual(r.status, FAILURE)
self.failUnlessEqual(r.identity_url, self.consumer_id)
r.message.index('delegate')
class TestCheckAuthResponse(TestIdRes):
def _createAssoc(self):
issued = time.time()
lifetime = 1000
assoc = association.Association(
'handle', 'secret', issued, lifetime, 'HMAC-SHA1')
store = self.consumer.store
store.storeAssociation(self.server_url, assoc)
assoc2 = store.getAssociation(self.server_url)
self.failUnlessEqual(assoc, assoc2)
def test_goodResponse(self):
"""successful response to check_authentication"""
response = {
'is_valid':'true',
}
r = self.consumer._processCheckAuthResponse(response, self.server_url)
self.failUnless(r)
def test_missingAnswer(self):
"""check_authentication returns false when the server sends no answer"""
response = {
}
r = self.consumer._processCheckAuthResponse(response, self.server_url)
self.failIf(r)
def test_badResponse(self):
"""check_authentication returns false when is_valid is false"""
response = {
'is_valid':'false',
}
r = self.consumer._processCheckAuthResponse(response, self.server_url)
self.failIf(r)
def test_badResponseInvalidate(self):
"""Make sure that the handle is invalidated when is_valid is false"""
self._createAssoc()
response = {
'is_valid':'false',
'invalidate_handle':'handle',
}
r = self.consumer._processCheckAuthResponse(response, self.server_url)
self.failIf(r)
self.failUnless(
self.consumer.store.getAssociation(self.server_url) is None)
def test_invalidateMissing(self):
"""invalidate_handle with a handle that is not present"""
response = {
'is_valid':'true',
'invalidate_handle':'missing',
}
r = self.consumer._processCheckAuthResponse(response, self.server_url)
self.failUnless(r)
def test_invalidatePresent(self):
"""invalidate_handle with a handle that exists"""
self._createAssoc()
response = {
'is_valid':'true',
'invalidate_handle':'handle',
}
r = self.consumer._processCheckAuthResponse(response, self.server_url)
self.failUnless(r)
self.failUnless(
self.consumer.store.getAssociation(self.server_url) is None)
class IdResFetchFailingConsumer(GenericConsumer):
message = 'fetch failed'
def _doIdRes(self, *args, **kwargs):
raise HTTPFetchingError(self.message)
class TestFetchErrorInIdRes(TestIdRes):
consumer_class = IdResFetchFailingConsumer
def test_idResFailure(self):
query = {'openid.mode': 'id_res'}
r = self.consumer.complete(query, self.endpoint)
self.failUnlessEqual(r.status, FAILURE)
self.failUnlessEqual(r.identity_url, self.consumer_id)
r.message.index(IdResFetchFailingConsumer.message)
class TestSetupNeeded(TestIdRes):
def test_setupNeeded(self):
setup_url = 'http://unittest/setup-here'
query = {
'openid.mode': 'id_res',
'openid.user_setup_url': setup_url,
}
ret = self.consumer._doIdRes(query, self.endpoint,)
self.failUnlessEqual(ret.status, SETUP_NEEDED)
self.failUnlessEqual(ret.setup_url, setup_url)
class CheckAuthHappened(Exception): pass
class CheckAuthDetectingConsumer(GenericConsumer):
def _checkAuth(self, *args):
raise CheckAuthHappened(args)
class CatchLogs(object):
def setUp(self):
self.old_logger = oidutil.log
oidutil.log = self.gotLogMessage
self.messages = []
def gotLogMessage(self, message):
self.messages.append(message)
def tearDown(self):
oidutil.log = self.old_logger
class CheckNonceTest(TestIdRes, CatchLogs):
def setUp(self):
CatchLogs.setUp(self)
TestIdRes.setUp(self)
self.nonce = self.id()
self.store.storeNonce(self.nonce)
def tearDown(self):
CatchLogs.tearDown(self)
def test_goodNonce(self):
self.return_to = 'http://rt.unittest/?nonce=%s' % (self.nonce,)
self.response = SuccessResponse(self.endpoint,
{'openid.return_to': self.return_to})
ret = self.consumer._checkNonce(self.response, self.nonce)
self.failUnlessEqual(ret.status, SUCCESS)
self.failUnlessEqual(ret.identity_url, self.consumer_id)
def test_badNonce(self):
# remove the nonce from the store
self.store.useNonce(self.nonce)
self.return_to = 'http://rt.unittest/?nonce=%s' % (self.nonce,)
self.response = SuccessResponse(self.endpoint,
{'openid.return_to': self.return_to})
ret = self.consumer._checkNonce(self.response, self.nonce)
self.failUnlessEqual(ret.status, FAILURE)
self.failUnlessEqual(ret.identity_url, self.consumer_id)
self.failUnless(ret.message.startswith('Nonce missing from store'),
ret.message)
def test_tamperedNonce(self):
self.return_to = 'http://rt.unittest/?nonce=HACKED-%s' % (self.nonce,)
self.response = SuccessResponse(self.endpoint,
{'openid.return_to': self.return_to})
ret = self.consumer._checkNonce(self.response, self.nonce)
self.failUnlessEqual(ret.status, FAILURE)
self.failUnlessEqual(ret.identity_url, self.consumer_id)
self.failUnless(ret.message.startswith('Nonce mismatch'), ret.message)
def test_missingNonce(self):
# no nonce parameter on the return_to
self.response = SuccessResponse(self.endpoint,
{'openid.return_to': self.return_to})
ret = self.consumer._checkNonce(self.response, self.nonce)
self.failUnlessEqual(ret.status, FAILURE)
self.failUnlessEqual(ret.identity_url, self.consumer_id)
self.failUnless(ret.message.startswith('Nonce missing from return_to'))
class TestCheckAuthTriggered(TestIdRes, CatchLogs):
consumer_class = CheckAuthDetectingConsumer
def setUp(self):
TestIdRes.setUp(self)
CatchLogs.setUp(self)
def _doIdRes(self, query):
return self.consumer._doIdRes(query, self.endpoint)
def test_checkAuthTriggered(self):
query = {
'openid.return_to':self.return_to,
'openid.identity':self.server_id,
'openid.assoc_handle':'not_found',
}
try:
result = self._doIdRes(query)
except CheckAuthHappened:
pass
else:
self.fail('_checkAuth did not happen. Result was: %r %s' %
(result, self.messages))
def test_checkAuthTriggeredWithAssoc(self):
# Store an association for this server that does not match the
# handle that is in the query
issued = time.time()
lifetime = 1000
assoc = association.Association(
'handle', 'secret', issued, lifetime, 'HMAC-SHA1')
self.store.storeAssociation(self.server_url, assoc)
query = {
'openid.return_to':self.return_to,
'openid.identity':self.server_id,
'openid.assoc_handle':'not_found',
}
try:
result = self._doIdRes(query)
except CheckAuthHappened:
pass
else:
self.fail('_checkAuth did not happen. Result was: %r' % (result,))
def test_expiredAssoc(self):
# Store an expired association for the server with the handle
# that is in the query
issued = time.time() - 10
lifetime = 0
handle = 'handle'
assoc = association.Association(
handle, 'secret', issued, lifetime, 'HMAC-SHA1')
self.failUnless(assoc.expiresIn <= 0)
self.store.storeAssociation(self.server_url, assoc)
query = {
'openid.return_to':self.return_to,
'openid.identity':self.server_id,
'openid.assoc_handle':handle,
}
info = self._doIdRes(query)
self.failUnlessEqual(FAILURE, info.status)
self.failUnlessEqual(self.consumer_id, info.identity_url)
info.message.index('expired') # raises an exception if it's not there
def test_newerAssoc(self):
# Store an expired association for the server with the handle
# that is in the query
lifetime = 1000
good_issued = time.time() - 10
good_handle = 'handle'
good_assoc = association.Association(
good_handle, 'secret', good_issued, lifetime, 'HMAC-SHA1')
self.store.storeAssociation(self.server_url, good_assoc)
bad_issued = time.time() - 5
bad_handle = 'handle2'
bad_assoc = association.Association(
bad_handle, 'secret', bad_issued, lifetime, 'HMAC-SHA1')
self.store.storeAssociation(self.server_url, bad_assoc)
query = {
'openid.return_to':self.return_to,
'openid.identity':self.server_id,
'openid.assoc_handle':good_handle,
}
good_assoc.addSignature(['return_to', 'identity'], query)
info = self._doIdRes(query)
self.failUnlessEqual(info.status, SUCCESS)
self.failUnlessEqual(self.consumer_id, info.identity_url)
class MockFetcher(object):
def __init__(self, response=None):
self.response = response or HTTPResponse()
self.fetches = []
def fetch(self, url, body=None, headers=None):
self.fetches.append((url, body, headers))
return self.response
class ExceptionRaisingMockFetcher(object):
def fetch(self, url, body=None, headers=None):
raise HTTPFetchingError('mock fetcher exception')
class BadArgCheckingConsumer(GenericConsumer):
def _makeKVPost(self, args, _):
assert args == {
'openid.mode':'check_authentication',
'openid.signed':'foo',
}, args
return None
class TestCheckAuth(unittest.TestCase, CatchLogs):
consumer_class = GenericConsumer
def setUp(self):
CatchLogs.setUp(self)
self.store = _memstore.MemoryStore()
self.consumer = self.consumer_class(self.store)
self.fetcher = MockFetcher()
fetchers.setDefaultFetcher(self.fetcher)
def test_error(self):
self.fetcher.response = HTTPResponse(
"http://some_url", 404, {'Hea': 'der'}, 'blah:blah\n')
query = {'openid.signed': 'stuff, things'}
r = self.consumer._checkAuth(query, http_server_url)
self.failIf(r)
self.failUnless(self.messages)
def test_bad_args(self):
query = {
'openid.signed':'foo',
'closid.foo':'something',
}
consumer = BadArgCheckingConsumer(self.store)
consumer._checkAuth(query, 'does://not.matter')
class TestFetchAssoc(unittest.TestCase, CatchLogs):
consumer_class = GenericConsumer
def setUp(self):
CatchLogs.setUp(self)
self.store = _memstore.MemoryStore()
self.fetcher = MockFetcher()
fetchers.setDefaultFetcher(self.fetcher)
self.consumer = self.consumer_class(self.store)
def test_error(self):
self.fetcher.response = HTTPResponse(
"http://some_url", 404, {'Hea': 'der'}, 'blah:blah\n')
r = self.consumer._makeKVPost({'openid.mode':'associate'},
"http://server_url")
self.failUnlessEqual(r, None)
self.failUnless(self.messages)
def test_error_exception(self):
self.fetcher = ExceptionRaisingMockFetcher()
fetchers.setDefaultFetcher(self.fetcher)
self.failUnlessRaises(fetchers.HTTPFetchingError,
self.consumer._makeKVPost,
{'openid.mode':'associate'},
"http://server_url")
# exception fetching returns no association
self.failUnless(self.consumer._getAssociation('some://url') is None)
self.failUnlessRaises(fetchers.HTTPFetchingError,
self.consumer._checkAuth,
{'openid.signed':''},
'some://url')
class TestAuthRequest(unittest.TestCase):
def setUp(self):
self.endpoint = OpenIDServiceEndpoint()
self.endpoint.delegate = 'http://server.unittest/joe'
self.endpoint.server_url = 'http://server.unittest/'
self.assoc = self
self.assoc.handle = 'assoc@handle'
self.authreq = AuthRequest(self.endpoint, self.assoc)
def test_addExtensionArg(self):
self.authreq.addExtensionArg('bag', 'color', 'brown')
self.authreq.addExtensionArg('bag', 'material', 'paper')
self.failUnlessEqual(self.authreq.extra_args,
{'openid.bag.color': 'brown',
'openid.bag.material': 'paper'})
url = self.authreq.redirectURL('http://7.utest/', 'http://7.utest/r')
self.failUnless(url.find('openid.bag.color=brown') != -1,
'extension arg not found in %s' % (url,))
self.failUnless(url.find('openid.bag.material=paper') != -1,
'extension arg not found in %s' % (url,))
class TestSuccessResponse(unittest.TestCase):
def setUp(self):
self.endpoint = OpenIDServiceEndpoint()
self.endpoint.identity_url = 'identity_url'
def test_extensionResponse(self):
resp = SuccessResponse(self.endpoint, {
'openid.unittest.one':'1',
'openid.unittest.two':'2',
'openid.sreg.nickname':'j3h',
'openid.return_to':'return_to',
})
utargs = resp.extensionResponse('unittest')
self.failUnlessEqual(utargs, {'one':'1', 'two':'2'})
sregargs = resp.extensionResponse('sreg')
self.failUnlessEqual(sregargs, {'nickname':'j3h'})
def test_noReturnTo(self):
resp = SuccessResponse(self.endpoint, {})
self.failUnless(resp.getReturnTo() is None)
def test_returnTo(self):
resp = SuccessResponse(self.endpoint, {'openid.return_to':'return_to'})
self.failUnlessEqual(resp.getReturnTo(), 'return_to')
class TestParseAssociation(TestIdRes):
secret = 'x' * 20
def test_missing(self):
# Missing required arguments
result = self.consumer._parseAssociation({}, None, 'server_url')
self.failUnless(result is None)
def _setUpDH(self):
sess, args = \
self.consumer._createAssociateRequest(self.server_url)
server_sess = DiffieHellmanServerSession.fromQuery(args)
server_resp = server_sess.answer(self.secret)
server_resp['assoc_type'] = 'HMAC-SHA1'
server_resp['assoc_handle'] = 'handle'
server_resp['expires_in'] = '1000'
server_resp['session_type'] = 'DH-SHA1'
return sess, server_resp
def test_success(self):
sess, server_resp = self._setUpDH()
ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
self.failIf(ret is None)
self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1')
self.failUnlessEqual(ret.secret, self.secret)
self.failUnlessEqual(ret.handle, 'handle')
self.failUnlessEqual(ret.lifetime, 1000)
def test_badAssocType(self):
sess, server_resp = self._setUpDH()
server_resp['assoc_type'] = 'Crazy Low Prices!!!'
ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
self.failUnless(ret is None)
def test_badExpiresIn(self):
sess, server_resp = self._setUpDH()
server_resp['expires_in'] = 'Crazy Low Prices!!!'
ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
self.failUnless(ret is None)
def test_badSessionType(self):
sess, server_resp = self._setUpDH()
server_resp['session_type'] = '|/iA6rA'
ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
self.failUnless(ret is None)
def test_plainFallback(self):
sess = DiffieHellmanConsumerSession()
server_resp = {
'assoc_type': 'HMAC-SHA1',
'assoc_handle': 'handle',
'expires_in': '1000',
'mac_key': oidutil.toBase64(self.secret),
}
ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
self.failIf(ret is None)
self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1')
self.failUnlessEqual(ret.secret, self.secret)
self.failUnlessEqual(ret.handle, 'handle')
self.failUnlessEqual(ret.lifetime, 1000)
def test_plainFallbackFailure(self):
sess = DiffieHellmanConsumerSession()
# missing mac_key
server_resp = {
'assoc_type': 'HMAC-SHA1',
'assoc_handle': 'handle',
'expires_in': '1000',
}
ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
self.failUnless(ret is None)
def test_badDHValues(self):
sess, server_resp = self._setUpDH()
server_resp['enc_mac_key'] = '\x00\x00\x00'
ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
self.failUnless(ret is None)
class StubConsumer(object):
def __init__(self):
self.assoc = object()
self.response = None
self.endpoint = None
def begin(self, service):
auth_req = AuthRequest(service, self.assoc)
self.endpoint = service
return auth_req
def complete(self, query, endpoint):
assert endpoint is self.endpoint
return self.response
class ConsumerTest(unittest.TestCase):
def setUp(self):
self.endpoint = OpenIDServiceEndpoint()
self.endpoint.identity_url = self.identity_url = 'http://identity.url/'
self.store = None
self.session = {}
self.consumer = Consumer(self.session, self.store)
self.consumer.consumer = StubConsumer()
self.discovery = Discovery(self.session,
self.identity_url,
self.consumer.session_key_prefix)
def test_beginWithoutDiscovery(self):
# Does this really test anything non-trivial?
result = self.consumer.beginWithoutDiscovery(self.endpoint)
# The result is an auth request
self.failUnless(isinstance(result, AuthRequest))
# Side-effect of calling beginWithoutDiscovery is setting the
# session value to the endpoint attribute of the result
self.failUnless(self.session[self.consumer._token_key] is result.endpoint)
# The endpoint that we passed in is the endpoint on the auth_request
self.failUnless(result.endpoint is self.endpoint)
def test_completeEmptySession(self):
response = self.consumer.complete({})
self.failUnlessEqual(response.status, FAILURE)
self.failUnless(response.identity_url is None)
def _doResp(self, auth_req, exp_resp):
"""complete a transaction, using the expected response from
the generic consumer."""
self.consumer.consumer.response = exp_resp
# endpoint is stored in the session
self.failUnless(self.session)
resp = self.consumer.complete({})
# All responses should have the same identity URL, and the
# session should be cleaned out
self.failUnless(resp.identity_url is self.identity_url)
self.failIf(self.consumer._token_key in self.session)
# Expected status response
self.failUnlessEqual(resp.status, exp_resp.status)
return resp
def _doRespNoDisco(self, exp_resp):
"""Set up a transaction without discovery"""
auth_req = self.consumer.beginWithoutDiscovery(self.endpoint)
resp = self._doResp(auth_req, exp_resp)
# There should be nothing left in the session once we have completed.
self.failIf(self.session)
return resp
def test_noDiscoCompleteSuccessWithToken(self):
self._doRespNoDisco(SuccessResponse(self.endpoint, {}))
def test_noDiscoCompleteCancelWithToken(self):
self._doRespNoDisco(CancelResponse(self.endpoint))
def test_noDiscoCompleteFailure(self):
msg = 'failed!'
resp = self._doRespNoDisco(FailureResponse(self.endpoint, msg))
self.failUnless(resp.message is msg)
def test_noDiscoCompleteSetupNeeded(self):
setup_url = 'http://setup.url/'
resp = self._doRespNoDisco(
SetupNeededResponse(self.endpoint, setup_url))
self.failUnless(resp.setup_url is setup_url)
# To test that discovery is cleaned up, we need to initialize a
# Yadis manager, and have it put its values in the session.
def _doRespDisco(self, is_clean, exp_resp):
"""Set up and execute a transaction, with discovery"""
self.discovery.createManager([self.endpoint], self.identity_url)
auth_req = self.consumer.begin(self.identity_url)
resp = self._doResp(auth_req, exp_resp)
manager = self.discovery.getManager()
if is_clean:
self.failUnless(self.discovery.getManager() is None, manager)
else:
self.failIf(self.discovery.getManager() is None, manager)
return resp
# Cancel and success DO clean up the discovery process
def test_completeSuccess(self):
self._doRespDisco(True, SuccessResponse(self.endpoint, {}))
def test_completeCancel(self):
self._doRespDisco(True, CancelResponse(self.endpoint))
# Failure and setup_needed don't clean up the discovery process
def test_completeFailure(self):
msg = 'failed!'
resp = self._doRespDisco(False, FailureResponse(self.endpoint, msg))
self.failUnless(resp.message is msg)
def test_completeSetupNeeded(self):
setup_url = 'http://setup.url/'
resp = self._doRespDisco(
False,
SetupNeededResponse(self.endpoint, setup_url))
self.failUnless(resp.setup_url is setup_url)
def test_begin(self):
self.discovery.createManager([self.endpoint], self.identity_url)
# Should not raise an exception
auth_req = self.consumer.begin(self.identity_url)
self.failUnless(isinstance(auth_req, AuthRequest))
self.failUnless(auth_req.endpoint is self.endpoint)
self.failUnless(auth_req.endpoint is self.consumer.consumer.endpoint)
self.failUnless(auth_req.assoc is self.consumer.consumer.assoc)
if __name__ == '__main__':
unittest.main()