# -*- test-case-name: openid.test.test_discover -*-
from urljr import fetchers
from openid import oidutil
# If the Yadis library is available, use it. Otherwise, only use
# old-style discovery.
try:
import yadis
except ImportError:
yadis_available = False
oidutil.log('Consumer operating without Yadis support '
'(failed to import Yadis library)')
class DiscoveryFailure(RuntimeError):
"""A failure to discover an OpenID server.
When the C{yadis} package is available, this is
C{yadis.discover.DiscoveryFailure}."""
else:
yadis_available = True
from yadis.etxrd import nsTag, XRDSError
from yadis.services import applyFilter as extractServices
from yadis.discover import discover as yadisDiscover
from yadis.discover import DiscoveryFailure
from yadis import xrires, filters
from openid.consumer.parse import openIDDiscover as parseOpenIDLinkRel
from openid.consumer.parse import ParseError
OPENID_1_0_NS = 'http://openid.net/xmlns/1.0'
OPENID_1_2_TYPE = 'http://openid.net/signon/1.2'
OPENID_1_1_TYPE = 'http://openid.net/signon/1.1'
OPENID_1_0_TYPE = 'http://openid.net/signon/1.0'
class OpenIDServiceEndpoint(object):
"""Object representing an OpenID service endpoint.
@ivar identity_url: the verified identifier.
@ivar canonicalID: For XRI, the persistent identifier.
"""
openid_type_uris = [
OPENID_1_2_TYPE,
OPENID_1_1_TYPE,
OPENID_1_0_TYPE,
]
def __init__(self):
self.identity_url = None
self.server_url = None
self.type_uris = []
self.delegate = None
self.canonicalID = None
self.used_yadis = False # whether this came from an XRDS
def usesExtension(self, extension_uri):
return extension_uri in self.type_uris
def parseService(self, yadis_url, uri, type_uris, service_element):
"""Set the state of this object based on the contents of the
service element."""
self.type_uris = type_uris
self.identity_url = yadis_url
self.server_url = uri
self.delegate = findDelegate(service_element)
self.used_yadis = True
def getServerID(self):
"""Return the identifier that should be sent as the
openid.identity_url parameter to the server."""
if self.delegate is None:
return self.canonicalID or self.identity_url
else:
return self.delegate
def fromBasicServiceEndpoint(cls, endpoint):
"""Create a new instance of this class from the endpoint
object passed in.
@return: None or OpenIDServiceEndpoint for this endpoint object"""
type_uris = endpoint.matchTypes(cls.openid_type_uris)
# If any Type URIs match and there is an endpoint URI
# specified, then this is an OpenID endpoint
if type_uris and endpoint.uri is not None:
openid_endpoint = cls()
openid_endpoint.parseService(
endpoint.yadis_url,
endpoint.uri,
endpoint.type_uris,
endpoint.service_element)
else:
openid_endpoint = None
return openid_endpoint
fromBasicServiceEndpoint = classmethod(fromBasicServiceEndpoint)
def fromHTML(cls, uri, html):
"""Parse the given document as HTML looking for an OpenID
@raises: openid.consumer.parse.ParseError
"""
delegate_url, server_url = parseOpenIDLinkRel(html)
service = cls()
service.identity_url = uri
service.delegate = delegate_url
service.server_url = server_url
service.type_uris = [OPENID_1_0_TYPE]
return service
fromHTML = classmethod(fromHTML)
def findDelegate(service_element):
"""Extract a openid:Delegate value from a Yadis Service element
represented as an ElementTree Element object. If no delegate is
found, returns None."""
# XXX: should this die if there is more than one delegate element?
delegate_tag = nsTag(OPENID_1_0_NS, 'Delegate')
delegates = service_element.findall(delegate_tag)
for delegate_element in delegates:
delegate = delegate_element.text
break
else:
delegate = None
return delegate
def discoverYadis(uri):
"""Discover OpenID services for a URI. Tries Yadis and falls back
on old-style discovery if Yadis fails.
@param uri: normalized identity URL
@type uri: str
@return: (identity_url, services)
@rtype: (str, list(OpenIDServiceEndpoint))
@raises: DiscoveryFailure
"""
# Might raise a yadis.discover.DiscoveryFailure if no document
# came back for that URI at all. I don't think falling back
# to OpenID 1.0 discovery on the same URL will help, so don't
# bother to catch it.
response = yadisDiscover(uri)
identity_url = response.normalized_uri
try:
openid_services = extractServices(
response.normalized_uri, response.response_text,
OpenIDServiceEndpoint)
except XRDSError:
# Does not parse as a Yadis XRDS file
openid_services = []
if not openid_services:
# Either not an XRDS or there are no OpenID services.
if response.isXRDS():
# if we got the Yadis content-type or followed the Yadis
# header, re-fetch the document without following the Yadis
# header, with no Accept header.
return discoverNoYadis(uri)
else:
body = response.response_text
# Try to parse the response as HTML to get OpenID 1.0/1.1
#
try:
service = OpenIDServiceEndpoint.fromHTML(identity_url, body)
except ParseError:
pass # Parsing failed, so return an empty list
else:
openid_services = [service]
return (identity_url, openid_services)
def discoverXRI(iname):
endpoints = []
try:
canonicalID, services = xrires.ProxyResolver().query(
iname, OpenIDServiceEndpoint.openid_type_uris)
flt = filters.mkFilter(OpenIDServiceEndpoint)
for service_element in services:
endpoints.extend(flt.getServiceEndpoints(iname, service_element))
except XRDSError:
oidutil.log('xrds error on ' + iname)
for endpoint in endpoints:
# Is there a way to pass this through the filter to the endpoint
# constructor instead of tacking it on after?
endpoint.canonicalID = canonicalID
# FIXME: returned xri should probably be in some normal form
return iname, endpoints
def discoverNoYadis(uri):
http_resp = fetchers.fetch(uri)
if http_resp.status != 200:
raise DiscoveryFailure(
'HTTP Response status from identity URL host is not 200. '
'Got status %r' % (http_resp.status,), http_resp)
identity_url = http_resp.final_url
# Try to parse the response as HTML to get OpenID 1.0/1.1
#
try:
service = OpenIDServiceEndpoint.fromHTML(identity_url, http_resp.body)
except ParseError:
openid_services = []
else:
openid_services = [service]
return identity_url, openid_services
if yadis_available:
discover = discoverYadis
else:
discover = discoverNoYadis