#!/usr/bin/env python __copyright__ = 'Copyright 2005, Janrain, Inc.' from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from urlparse import urlparse import time import Cookie import cgi import cgitb import sys def quoteattr(s): qs = cgi.escape(s, 1) return '"%s"' % (qs,) try: import openid except ImportError: print >>sys.stderr, """ Failed to import the OpenID library. In order to use this example, you must either install the library (see INSTALL in the root of the distribution) or else add the library to python's import path (the PYTHONPATH environment variable). For more information, see the README in the root of the library distribution or http://www.openidenabled.com/ """ sys.exit(1) from openid import oidutil from openid.server import server from openid.store.filestore import FileOpenIDStore class OpenIDHTTPServer(HTTPServer): """ http server that contains a reference to an OpenID Server and knows its base URL. """ def __init__(self, oidserver, *args, **kwargs): HTTPServer.__init__(self, *args, **kwargs) if self.server_port != 80: self.base_url = ('http://%s:%s/' % (self.server_name, self.server_port)) else: self.base_url = 'http://%s/' % (self.server_name,) self.openid = oidserver self.approved = {} self.lastCheckIDRequest = {} class ServerHandler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): self.user = None BaseHTTPRequestHandler.__init__(self, *args, **kwargs) def do_GET(self): try: self.parsed_uri = urlparse(self.path) self.query = {} for k, v in cgi.parse_qsl(self.parsed_uri[4]): self.query[k] = v self.setUser() path = self.parsed_uri[2].lower() if path == '/': self.showMainPage() elif path == '/openidserver': self.serverEndPoint(self.query) elif path == '/login': self.showLoginPage('/', '/') elif path == '/loginsubmit': self.doLogin() else: self.showIdPage(path) except (KeyboardInterrupt, SystemExit): raise except: self.send_response(500) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(cgitb.html(sys.exc_info(), context=10)) def do_POST(self): try: self.parsed_uri = urlparse(self.path) self.setUser() content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) self.query = {} for k, v in cgi.parse_qsl(post_data): self.query[k] = v path = self.parsed_uri[2] if path == '/openidserver': self.serverEndPoint(self.query) elif path == '/allow': self.handleAllow(self.query) else: self.send_response(404) self.end_headers() except (KeyboardInterrupt, SystemExit): raise except: self.send_response(500) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(cgitb.html(sys.exc_info(), context=10)) def handleAllow(self, query): # pretend this next bit is keying off the user's session or something, # right? request = self.server.lastCheckIDRequest.get(self.user) if 'yes' in query: identity = request.identity trust_root = request.trust_root if self.query.get('remember', 'no') == 'yes': duration = 'always' else: duration = 'once' self.server.approved[(identity, trust_root)] = duration if 'login_as' in query: self.user = self.query['login_as'] response = request.answer(True) elif 'no' in query: response = request.answer(False) else: assert False, 'strange allow post. %r' % (query,) self.displayResponse(response) def setUser(self): cookies = self.headers.get('Cookie') if cookies: morsel = Cookie.BaseCookie(cookies).get('user') if morsel: self.user = morsel.value def isAuthorized(self, identity_url, trust_root): if self.user is None: return False if identity_url != self.server.base_url + self.user: return False key = (identity_url, trust_root) approval = self.server.approved.get(key) if approval == 'once': del self.server.approved[key] return approval is not None def serverEndPoint(self, query): try: request = self.server.openid.decodeRequest(query) except server.ProtocolError, why: self.displayResponse(why) return if request is None: # Display text indicating that this is an endpoint. self.showAboutPage() return if request.mode in ["checkid_immediate", "checkid_setup"]: if (self.user and self.isAuthorized(request.identity, request.trust_root)): response = request.answer(True) elif request.immediate: response = request.answer( False, server_url=self.server.base_url + 'openidserver') else: self.server.lastCheckIDRequest[self.user] = request self.showDecidePage(request) return else: response = self.server.openid.handleRequest(request) self.displayResponse(response) def displayResponse(self, response): try: webresponse = self.server.openid.encodeResponse(response) except server.EncodingError, why: text = why.response.encodeToKVForm() self.showErrorPage('
%s' % cgi.escape(text)) return self.send_response(webresponse.code) for header, value in webresponse.headers.iteritems(): self.send_header(header, value) self.writeUserHeader() self.end_headers() if webresponse.body: self.wfile.write(webresponse.body) def doLogin(self): if 'submit' in self.query: if 'user' in self.query: self.user = self.query['user'] else: self.user = None self.redirect(self.query['success_to']) elif 'cancel' in self.query: self.redirect(self.query['fail_to']) else: assert 0, 'strange login %r' % (self.query,) def redirect(self, url): self.send_response(302) self.send_header('Location', url) self.writeUserHeader() self.end_headers() def writeUserHeader(self): if self.user is None: t1970 = time.gmtime(0) expires = time.strftime( 'Expires=%a, %d-%b-%y %H:%M:%S GMT', t1970) self.send_header('Set-Cookie', 'user=;%s' % expires) else: self.send_header('Set-Cookie', 'user=%s' % self.user) def showAboutPage(self): endpoint_url = self.server.base_url + 'openidserver' def link(url): url_attr = quoteattr(url) url_text = cgi.escape(url) return '
%s' % (url_attr, url_text)
def term(url, text):
return '%s is an OpenID server endpoint.
For more information about OpenID, see:
%s
''' % error_message) def showDecidePage(self, request): expected_user = request.identity[len(self.server.base_url):] if expected_user == self.user: msg = '''\A new site has asked for your identity. If you approve, the site represented by the trust root below will be told that you control identity URL listed below. (If you are using a delegated identity, the site will take care of reversing the delegation on its own.)
''' fdata = { 'identity': request.identity, 'trust_root': request.trust_root, } form = '''\| Identity: | %(identity)s |
| Trust Root: | %(trust_root)s |
Allow this authentication to proceed?
''' % fdata else: mdata = { 'expected_user': expected_user, 'user': self.user, } msg = '''\A site has asked for an identity belonging to %(expected_user)s, but you are logged in as %(user)s. To log in as %(expected_user)s and approve the login request, hit OK below. The "Remember this decision" checkbox applies only to the trust root decision.
''' % mdata fdata = { 'identity': request.identity, 'trust_root': request.trust_root, 'expected_user': expected_user, } form = '''\| Identity: | %(identity)s |
| Trust Root: | %(trust_root)s |
Allow this authentication to proceed?
''' % fdata self.showPage(200, 'Approve OpenID request?', msg=msg, form=form) def showIdPage(self, path): tag = '' %\ self.server.base_url ident = self.server.base_url + path[1:] approved_trust_roots = [] for (aident, trust_root) in self.server.approved.keys(): if aident == ident: trs = 'Approved trust roots:
\nThis is an identity page for %s.
%s ''' % (ident, msg)) def showMainPage(self): if self.user: openid_url = self.server.base_url + self.user user_message = """\You are logged in as %s. Your OpenID identity URL is %s. Enter that URL at an OpenID consumer to test this server.
""" % (self.user, quoteattr(openid_url), openid_url) else: user_message = """\This server uses a cookie to remember who you are in order to simulate a standard Web user experience. You are not logged in.
""" self.showPage(200, 'Main Page', msg='''\This is a simple OpenID server implemented using the Python OpenID library.
%sTo use this server with a consumer, the consumer must be able to fetch HTTP pages from this web server. If this computer is behind a firewall, you will not be able to use OpenID consumers outside of the firewall with it.
The URL for this server is %s.
''' % (user_message, quoteattr(self.server.base_url), self.server.base_url)) def showLoginPage(self, success_to, fail_to): self.showPage(200, 'Login Page', form='''\You may log in with any name. This server does not use passwords because it is just a sample of how to use the OpenID library.
''' % (success_to, fail_to)) def showPage(self, response_code, title, link_tag='', msg=None, err=None, form=None): if self.user is None: user_link = 'not logged in.' else: user_link = 'logged in as %s.
Python OpenID Server Example |
You are %(user_link)s |