""" Zope's user folder that keeps users in MySQL tables. """ __version__ = "$Rev: 73 $" # $URL: svn://localhost/mysqlUserFolder/trunk/mysqlUserFolder.py $ # # $LastChangedBy: vladap $ # $LastChangedDate: 2005-09-26 00:29:12 +0300 (Mon, 26 Sep 2005) $ import Globals, OFS, AccessControl import time, whrandom, array, threading, base64, os, string import cfg, util, vdb, db, log, passwords om_class = OFS.ObjectManager.ObjectManager # --------------------------------------------------------------------- */ # Public functions def manage_add_mysqlUserFolder (Self, realm, use_cookies, use_sessions, server, port, socket, database, username, password, anon_create_ok, anon_create_role, dtml_user_dir, cookie_path, cookie_domain, cookie_use_realm, disable_cookies_for_ports, default_password_type, REQUEST): """ Add new mysqlUserFolder """ if 'acl_users' in Self.objectIds (): return Globals.MessageDialog( title ='Item Exists', message='This object already contains a User Folder', action ='%s/manage_main' % REQUEST['URL1'] ) o = mysqlUserFolder (realm, server, port, socket, database, username, password, use_cookies, use_sessions, anon_create_ok, anon_create_role, cookie_path, cookie_domain, cookie_use_realm, dtml_user_dir, disable_cookies_for_ports, default_password_type, REQUEST) Self._setObject ('acl_users', o) Self.__allow_groups__ = Self.acl_users ob = Self._getOb ('acl_users') try: ob._addRoles (ob.aq_parent) except AttributeError: pass if REQUEST: return Self.manage_main (Self, REQUEST, update_menu = 1) # --------------------------------------------------------------------- */ # Utility functions def _add_dtml_method (obj, fname, id, title): (p, n) = os.path.split (fname) if string.find (n, '.') == -1: fname = fname + '.dtml' f = open (fname) d = f.read () f.close () obj.manage_addDTMLDocument (id, title, d) def _add_role (obj, role): obj._addRole (role) # --------------------------------------------------------------------- */ # Main class class mysqlUserFolder (AccessControl.User.BasicUserFolder, OFS.ObjectManager.ObjectManager): """ """ meta_type = 'MySQL User Folder' id = 'acl_users' title = 'mysqlUserFolder' icon = 'misc_/mysqlUserFolder/folder_icon' manage_options = ( { 'label': 'Contents', 'action': 'manage_main' }, { 'label': 'Info', 'action': 'manage_info' }, { 'label': 'Properties', 'action': 'manage_properties' }, { 'label': 'Parameters', 'action': 'manage_parameters' }, { 'label': 'Manage Users', 'action': 'manage_users' }, { 'label': 'Security', 'action': 'manage_access' }, { 'label': 'Undo', 'action': 'manage_UndoForm' }, { 'label': 'Advanced', 'action': 'manage_advanced' }, ) __ac_permissions__ = ( ('View management screens', ['manage','manage_menu','manage_main','manage_copyright', 'manage_tabs', 'manage_properties', 'manage_UndoForm', 'manage_parameters', 'manage_info', 'manage_dialog']), ('Undo changes', ['manage_undo_transactions']), ('Change permissions', ['manage_access']), ('Manage users', ['manage_users', 'manage_users_new', 'manage_users_edit', 'manage_users_create', 'manage_users_change_other', 'manage_users_change_password', 'manage_users_delete', 'manage_users_change_roles', 'manage_users_change_domains', 'manage_role_delete', 'manage_role_create', 'getUserCount', 'getRoleCount', 'getUserNames', 'getRoleNames', 'getUsers', 'getUser', 'hasUsers', ]), ('Manage MySQL User Folders', ['manage_edit', 'manage_connect', 'manage_edit_parameters', 'manage_addMySQLRoles', 'manage_main', 'manage_setup_user_actions', 'manage_advanced', ]), (cfg.MYSQL_USER_EDIT_PERMISSION, ['user_change_other_data', 'user_change_password', 'user_create', ]), ) manage_properties = \ Globals.DTMLFile ('dtml/m_Properties', globals (), mchange = 1) manage_info = \ Globals.DTMLFile ('dtml/m_Info', globals ()) manage_users = \ Globals.DTMLFile ('dtml/m_Users', globals ()) manage_users_edit = \ Globals.DTMLFile ('dtml/m_UserEdit', globals ()) manage_users_new = \ Globals.DTMLFile ('dtml/m_UserNew', globals ()) manage_parameters = \ Globals.DTMLFile ('dtml/m_Parameters', globals ()) manage_dialog = \ Globals.DTMLFile ('dtml/m_Dialog', globals ()) manage_advanced = \ Globals.DTMLFile ('dtml/m_Advanced', globals ()) manage_main = om_class.manage_main objectIds = om_class.objectIds objectValues = om_class.objectValues objectMap = om_class.objectMap objectItems = om_class.objectItems def __init__ (Self, realm = '*', mysql_server='localhost', mysql_port = 0, mysql_socket='', mysql_db = 'Zope', mysql_user = 'Zope', mysql_password = 'Zope', use_cookies = 0, use_sessions = 0, anon_create_ok = 0, anon_create_role = '', cookie_path = '/', cookie_domain = '', cookie_use_realm = 0, dtml_user_dir = "dtml.user", disable_cookies_for_ports = '', default_password_type = 0, REQUEST = None): if not cookie_path: cookie_path = '/' Self.mysql_server = mysql_server Self.mysql_port = mysql_port Self.mysql_socket = string.strip (mysql_socket) Self.mysql_db = mysql_db Self.mysql_user = mysql_user Self.mysql_password = mysql_password Self.realm = realm Self.use_cookies = use_cookies Self.use_sessions = use_sessions Self.default_password_type = default_password_type Self.anon_create_ok = anon_create_ok Self.anon_create_role = anon_create_role Self.cookie_path = cookie_path Self.cookie_domain = cookie_domain Self.cookie_use_realm = cookie_use_realm Self.dtml_user_dir = dtml_user_dir Self._setupDisabledPorts (disable_cookies_for_ports) Self.__setupParameters () Self.__connect () Self.__initUserDtml () def __del__ (Self): util.log_debug ("__del__ () called ().") #Self.__disconnect () def __setstate__ (Self, State): util.log_debug ("__setstate__ () called.") Globals.Persistent.__setstate__ (Self, State) if not hasattr (Self, 'cookie_path'): Self.cookie_path = '/' if not hasattr (Self, 'cookie_domain'): Self.cookie_domain = '' if not hasattr (Self, 'disable_ports'): Self.disable_ports = [] if not hasattr (Self, 'mysql_socket'): Self.mysql_socket = '' if not hasattr (Self, 'cookie_use_realm'): Self.cookie_use_realm = 0 if not hasattr (Self, 'default_password_type'): Self.default_password_type = 0 Self.__setupParameters () #Self.__connect () Self._v_db = None Self._v_user_db = None Self._v_session_db = None Self._v_token_db = None Self._v_miscdata_db = None def __initUserDtml (Self): pdir = util.base_prod_dir () dtml_dir = pdir + '/' + Self.dtml_user_dir fname = '%s/u_Login.dtml' % dtml_dir _add_dtml_method (Self, fname, 'docLogin', 'User login form') fname = '%s/u_UserPage' % dtml_dir _add_dtml_method (Self, fname, 'docUserPage', "User's page") fname = '%s/u_UserNew' % dtml_dir _add_dtml_method (Self, fname, 'docNewUser', 'New user form') fname = '%s/a_ChangeOther' % dtml_dir _add_dtml_method (Self, fname, 'actChangeOther', 'Change user data') fname = '%s/a_ChangePassword' % dtml_dir _add_dtml_method (Self, fname, 'actChangePassword', 'Change user pass') fname = '%s/a_CreateUser' % dtml_dir _add_dtml_method (Self, fname, 'actCreateUser', 'Create a user') def __setupParameters (Self): if not hasattr (Self, 'user_token_timeout'): Self.user_token_timeout = -1 if not hasattr (Self, 'user_token_life'): Self.user_token_life = 2592000 # 30 days if not hasattr (Self, 'user_token_persistent'): Self.user_token_persistent = 1 if not hasattr (Self, 'session_token_timeout'): Self.session_token_timeout = 900 # 15 mins if not hasattr (Self, 'session_token_life'): Self.session_token_life = 36000 # 10 hours if not hasattr (Self, 'session_token_persistent'): Self.session_token_persistent = 0 if not hasattr (Self, 'session_log_simple'): Self.session_log_simple = 1 if not hasattr (Self, 'session_log_full'): Self.session_log_full = 0 def _addRoles (Self, ZopeParent): if ZopeParent: roles = cfg.ZOPE_SYSTEM_ROLES if Self._v_user_db: roles = roles + Self._v_user_db.list_rolenames () util.log_debug ("connect (): Adding roles: " + `roles`) for role in roles: if role not in [cfg.ZOPE_ANONYMOUS_ROLE, cfg.ZOPE_AUTHENTICATED_ROLE]: _add_role (ZopeParent, role) def _delRoles (Self, ZopeParent, roles): if ZopeParent: for r in roles: if (r in cfg.ZOPE_SYSTEM_ROLES) or \ (r == cfg.ZOPE_MANAGER_ROLE): continue ZopeParent._delRoles (roles = [r], REQUEST = None) def _setupDisabledPorts (Self, ports): if type (ports) != type (''): ports = "%s" % (ports,) Self.disable_ports = string.split (string.strip (ports)) # ------------------------ # connect / disconnect def __is_connected (Self): if Self._v_db is None: return 0 if Self._v_db.has_errors (): util.log_debug ("DB Connection has errors, reconnecting.") Self.__disconnect () return 0 return 1 def __connect (Self): util.log_debug ("connect () called.") Self._v_db = None try: Self._v_db = db.dbConnection (Self.mysql_server, Self.mysql_port, Self.mysql_socket, Self.mysql_db, Self.mysql_user, Self.mysql_password) Self._v_db.connect () except: Self._v_db = None raise ci = vdb.HTTPCookieInfo () ci.path = Self.cookie_path ci.domain = Self.cookie_domain ci.use_realm = Self.cookie_use_realm Self._v_token_db = vdb.TokenDb (Self._v_db, Self.realm, ci) Self._v_user_db = vdb.UserDb (Self._v_db, Self.realm) Self._v_session_db = vdb.SessionDb (Self._v_db) Self._v_miscdata_db = vdb.MiscDataDB (Self._v_db) def __disconnect (Self): try: if Self._v_db: Self._v_db.disconnect () except: pass Self._v_db = None Self._v_token_db = None Self._v_user_db = None Self._v_session_db = None Self._v_miscdata_db = None def __check_connection (Self): if not Self.__is_connected (): Self.__connect () # ------------------------ # Session/user utility functions def __terminate_session (Self, REQUEST, RESPONSE, session): if Self.session_log_simple: log.session_log_end (Self._v_db, Self.realm, session) Self._v_token_db.request_delete_token (RESPONSE, 'session', session.getId ()) REQUEST ['Session'] = None def __create_session (Self, REQUEST, RESPONSE, user): if user: user_id = user.getDBId () else: user_id = cfg.VDB_INVALID_ID session = Self._v_session_db.create_session (user_id) Self._v_token_db.request_assign_new_token (RESPONSE, 'session', session.getId (), Self.session_token_timeout, Self.session_token_life, Self.session_token_persistent) session._set_user_folder (Self) if Self.session_log_simple: log.session_log_begin (Self._v_db, Self.realm, session) return session def __get_session_user (Self, session): """ This function returns user object that corresponds to a session. There are three possibilites: Valid user is assigned to a session and it belongs to user folder's realm, it does not belong to user folder's realm, and there is no valid users assigned to a session. """ if session is None: return None user_id = session.getUserId () if user_id != cfg.VDB_INVALID_ID: user = Self._v_user_db.get_user_by_id (user_id) if user: assert (user.getRealm () == Self.realm) else: user = None return user def __get_session_from_request (Self, REQUEST): sess_id = Self._v_token_db.request_check_token (REQUEST, 'session') session = Self._v_session_db.get_session_by_id (sess_id) if session: session._set_user_folder (Self) return session def __setup_session (Self, REQUEST, user, session, amethod, target_path): """ This method sets up user session. It assumes user is athenticated/authorized. If user is None, then it assumes Anonymous is authorized. Parameter amethod represents the way user is authenticated. """ RESPONSE = REQUEST ['RESPONSE'] if session: anon_session = session.isAnonymousUser () # Is session userid in the current realm ? if not anon_session: wrong_session_user = 0 if not anon_session: suser_id = session.getUserId () if user is None: wrong_session_user = 1 else: if user.getDBId () != suser_id: wrong_session_user = 1 if wrong_session_user: util.log_debug ("__setup_session (): Wrong session user.") Self.__terminate_session (REQUEST, RESPONSE, session) anon_session = 0 session = None assert (amethod != 'Cookie') # Since there is session user # use_sessions is true, since this function is called if not session: session = Self.__create_session (REQUEST, RESPONSE, user) anon_session = 0 if user and anon_session: Self._v_session_db.assign_user_id (session, user.getDBId ()) util.log_debug ("__setup_session (): Assigning " + "user '%s' to the session." % user.getUserName ()) REQUEST ['Session'] = session if Self.session_log_full: obj = util.get_obj_from_request (REQUEST) addr = util.get_address_from_request (REQUEST) if REQUEST.has_key ('QUERY_STRING'): query_string = REQUEST ['QUERY_STRING'] else: query_string = "" if type (query_string) != type (""): query_string = "" log.session_log_access (Self._v_db, Self.realm, session, target_path, query_string, obj, addr) # ------------------------ # User folder's standard interface functions # They are all protected by 'Manage Users' permission def hasUsers (Self): return Self.getUserCount () != 0 def getUserCount (Self): """ Returns number of users. """ Self.__check_connection () return Self._v_user_db.get_user_count () def getRoleCount (Self): """ Returns number of roles. """ Self.__check_connection () return Self._v_user_db.get_role_count () def getUserNames (Self, frole = None): """Returns a list of user names or [] if no users exist. Optional role can be used to filter usernames by role. """ Self.__check_connection () return Self._v_user_db.list_usernames (frole) def getRoleNames (Self): """Returns a list of role names or [] if no roles exist""" Self.__check_connection () return Self._v_user_db.list_rolenames () def getUsers (Self): """Returns a list of user objects or [] if no users exist""" Self.__check_connection () usernames = Self._v_user_db.list_usernames () users = [] for uname in usernames: user = Self._v_user_db.get_user (uname) if user: user._set_user_folder (Self) users.append (user) return users def getUser (Self, username): """Returns the named user object or None if no such user exists""" Self.__check_connection () user = Self._v_user_db.get_user (username) if user: user._set_user_folder (Self) # Don't know why, but Zope wants it without acquisition #return user.__of__ (Self) return user return None # ------------------------ # Authentication / autorization functions authenticate_user_pass__roles__ = () def authenticate_user_pass (Self, username, password, REQUEST): user = Self._v_user_db.get_user (username) if not user: return None if user.authenticate_password (password, REQUEST): return user # Since password is wrong, user is refreshed. Self._v_user_db.refresh_user (user) if user.authenticate_password (password, REQUEST): return user return None authenticate_session__roles__ = () def authenticate_session (Self, REQUEST): session = Self.__get_session_from_request (REQUEST) user = Self.__get_session_user (session) if session: util.log_debug ("authenticate_session (): " "Valid session found: %i" % session.getId ()) else: util.log_debug ("authenticate_session (): No valid session.") if user: util.log_debug ("authenticate_session (): Valid user found.") return (user, session) authenticate_cookie__roles__ = () def authenticate_cookie (Self, REQUEST): user_id = Self._v_token_db.request_check_token (REQUEST, 'user') util.log_debug ("authenticate_cookie (): got user id: %s" % user_id) if (user_id != cfg.VDB_INVALID_ID): user = Self._v_user_db.get_user_by_id (user_id) else: user = None if user is None: util.log_debug ("authenticate_cookie (): No valid user.") else: util.log_debug ("authenticate_cookie (): Got valid user.") return user authenticate_http__roles__ = () def authenticate_http (Self, REQUEST, auth): if auth is None: return None if auth == '': return None auth_type = string.lower (string.split (auth) [0]) if auth_type != 'basic': util.log_error ("Unknown http auth type.") return None username, password = tuple (string.split (base64.decodestring ( string.split (auth) [-1]), ':')) util.log_debug ("authenticate_http () called: " + username + " " + password) return Self.authenticate_user_pass (username, password, REQUEST) authenticate_main__roles__ = () def authenticate_main (Self, REQUEST, auth, use_cookies, use_sessions): user = None session = None amethod = None if use_sessions: (user, session) = Self.authenticate_session (REQUEST) if not use_cookies: user = None # We are not using session auth if we are not using cookies. if user is None: # Try standard method if use_cookies: user = Self.authenticate_cookie (REQUEST) if user: amethod = 'Cookie' else: user = Self.authenticate_http (REQUEST, auth) if user: amethod = 'HTTP' else: amethod = 'Session' if user: if not user.check_domain_spec (REQUEST): util.log_debug ("authenticate (): Domain spec check failed.") user = None session = None amethod = None if user: util.log_debug ("authenticate (): " "User authenticated using %s." % amethod) else: util.log_debug ("----- authenticate (): User not authenticated.") return (user, session, amethod) authorize_main__roles__ = () def authorize_main (Self, user, roles, REQUEST): util.log_debug ("authorize (): Username: " + user.getUserName () + ", User roles: " + `user.getRoles ()` + ", Requested roles: " + `roles` + ".") v = REQUEST ['PUBLISHED'] # the published object a, c, n, v = Self._getobcontext(v, REQUEST) allowed = AccessControl.User.BasicUserFolder.authorize \ (Self, user, a, c, n, v, roles) if not allowed: util.log_debug ('----- authorize (): Not authorized.') return None else: util.log_debug ('+++++ authorize (): Authorized.') return user # ------------------------ # Main validatation def check_auth_config (Self, REQUEST): use_cookies = Self.use_cookies use_sessions = Self.use_sessions port = util.get_port_from_request (REQUEST) if (port in Self.disable_ports): util.log_debug ("validate (): disabling cookie auth.") use_cookies = 0 use_sessions = 0 return (use_cookies, use_sessions) def validate (Self, REQUEST, auth = None, roles = []): RESPONSE = REQUEST ['RESPONSE'] target_path = util.get_path_from_request (REQUEST) util.log_debug ("***** \nvalidate () called: [%s], auth = '%s', " "path = '%s'" % (roles,auth, target_path)) if cfg.VALIDATE_ALWAYS_SUPER: util.log_debug ("Always validating super.") return AccessControl.SpecialUsers.super.__of__ (Self) Self.__check_connection () if roles is None: roles = [cfg.ZOPE_ANONYMOUS_ROLE, ] if auth is None: auth = '' (use_cookies, use_sessions) = Self.check_auth_config (REQUEST) (user, session, method) = \ Self.authenticate_main (REQUEST, auth, use_cookies, use_sessions) if not user: # Try anonymous if (use_cookies) and (cfg.ZOPE_ANONYMOUS_ROLE in roles): util.log_debug ("Validating nobody.") if use_sessions: Self.__setup_session (REQUEST, None, session, method, target_path) return AccessControl.SpecialUsers.nobody.__of__ (Self) # We need to ask for a login if use_cookies: REQUEST ['login_goto'] = target_path REQUEST ['exception_raised'] = 1 raise 'LoginRequired', Self.docLogin (Self, REQUEST) else: return None # We have a valid authenticated user user._set_user_folder (Self) user = Self.authorize_main (user, roles, REQUEST) if not user: raise 'Forbidden', \ "You are not authorized to access this document." if Self.use_sessions: Self.__setup_session (REQUEST, user, session, method, target_path) if cfg.VALIDATE_ONLY_ANONYMOUS: if cfg.ZOPE_ANONYMOUS_ROLE in roles: return AccessControl.SpecialUsers.nobody.__of__ (Self) else: raise 'Forbidden', \ "Only anonymous access is allowed." return user.__of__ (Self) # ------------------------ # User folder internal interface - GRUF needs this def authenticate_cmf (Self, username, password, REQUEST): RESPONSE = REQUEST ['RESPONSE'] # Let's check if user is valid user = Self.authenticate_user_pass (username, password, REQUEST) if not user: return None # Set cookies Self.user_login (REQUEST, RESPONSE, username, password, login_goto_path = None) return user def authenticate (Self, name, password, REQUEST): util.log_debug ("authenticate_GRUF () called: '%s', '%s'" % (name, password)) if cfg.VALIDATE_ALWAYS_SUPER: util.log_debug ("Always validating super.") return AccessControl.SpecialUsers.super.__of__ (Self) (use_cookies, use_sessions) = Self.check_auth_config (REQUEST) Self.__check_connection () if use_cookies: # Cookie auth r = Self.authenticate_main (REQUEST, None, use_cookies, use_sessions) (user, session, amethod) = r target_path = util.get_path_from_request (REQUEST) if not user: msg = "authenticate_GRUF (): cookie: trying username/password" util.log_debug (msg) user = Self.authenticate_cmf (name, password, REQUEST) if not user: util.log_debug ("authenticate_GRUF (): cookie: username/pass failed.") if use_sessions: Self.__setup_session (REQUEST, None, session, None, target_path) return None util.log_debug ("authenticate_GRUF (): cookie: username/pass OK") else: util.log_debug ("authenticate_GRUF (): authenticated by cookie") if use_sessions: Self.__setup_session (REQUEST, user, session, amethod, target_path) else: # No cookies if (not name) or (not password): util.log_debug ("authenticate_GRUF (): passwd: no valid data") return None user = Self.authenticate_user_pass (name, password, REQUEST) if not user: util.log_debug ("authenticate_GRUF (): passwd: invalid login") return None util.log_debug ("authenticate_GRUF (): passwd login ok") user._set_user_folder (Self) return user # ------------------------ # Other interface functions. # # miscdata_ functions () are called from user and session objects. # refresh_user () is called from user objects. getDefaultPasswordType = None def getDefaultPasswordType (Self): return Self.default_password_type password_types__roles__ = None def password_types (Self): return passwords.PASSWD_ENC_LIST [:] is_connected__roles__ = None def is_connected (Self): return Self.__is_connected () moduleDebugLevel__roles__ = None def moduleDebugLevel (Self): return cfg.DEBUG create_user__roles__ = () def create_user (Self, username, password, password_type, roles, realname, email, domains = []): """ This function creates a user. It expects a clear text password. """ Self.__check_connection () user = Self._v_user_db.create_user (username, roles) if not user: return 0 if (domains is None) or (domains == ''): domains = [] user._setAuthData (roles, domains) Self._v_user_db.change_user_domains (user, domains) Self._v_user_db.change_user_other (user, realname, email) Self._v_user_db.change_user_password (user, password, password_type) return 1 check_valid_mysql_user__roles__ = None def check_valid_mysql_user (Self, REQUEST): Self.__check_connection () if not REQUEST.has_key ('AUTHENTICATED_USER'): raise 'Unauthorized' user = REQUEST ['AUTHENTICATED_USER'] try: user.isAmysqlUserObject except AttributeError: raise 'NotAmysqlUser' if user.getRealm () != Self.realm: raise 'InvalidUser' return user miscdata_set__roles__ = () def miscdata_set (Self, obj_class, obj_id, misc_id, misc_tuple): Self._v_miscdata_db.save_data (obj_class, obj_id, misc_id, misc_tuple) miscdata_del__roles__ = () def miscdata_del (Self, obj_class, obj_id, misc_id): Self._v_miscdata_db.del_data (obj_class, obj_id, misc_id) miscdata_get__roles__ = () def miscdata_get (Self, obj_class, obj_id, misc_id): return Self._v_miscdata_db.get_data (obj_class, obj_id, misc_id) refresh_user__roles__ = () def refresh_user (Self, user): return Self._v_user_db.refresh_user (user) getDBFieldSize__roles__ = None def getDBFieldSize (Self, fname): n = "FSIZE_" + fname val = cfg.__dict__ [n] return val getSpecialRoles__roles__ = None def getSpecialRoles (Self): return cfg.ZOPE_SYSTEM_ROLES UserFolderUsesCookies__roles__ = None def UserFolderUsesCookies (Self): return Self.use_cookies # -------------------------------- # Special zope interface functions def _doAddUser (self, name, password, roles, domains, **kw): dp = self.default_password_type password_type = kw.get ('password_type', dp) email = kw.get ('email', '') realname = kw.get ('realname', '') self.manage_users_create (name, password, password_type, roles, realname, email, domains) def _doChangeUser (self, name, password, roles, domains, **kw): dp = self.default_password_type password_type = kw.get ('password_type', dp) email = kw.get ('email', '') realname = kw.get ('realname', '') self.manage_users_change_roles (None, name, roles) if realname or email: self.manage_users_change_other (None, name, realname, email) if password: self.manage_users_change_password (None, name, password, password_type) def _doDelUsers (self, names): for n in names: self.manage_users_delete (None, n) # -------------------------------- # Disabling some uneeded inherited functions def manage_userFolderProperties (Self, REQUEST=None, manage_tabs_message = None): raise AccessControl.User.NotImplemented def manage_setUserFolderProperties (Self, ep=0, up=0, maxlistusers=0, REQUEST=None): raise AccessControl.User.NotImplemented # ------------------------ # Manage functions def manage_setup_user_actions (Self, owner_username, proxy_role, REQUEST= None): """ Setup act* methods.""" owner_username = string.strip (owner_username) proxy_role = string.strip (proxy_role) ll = Self.objectItems ('DTML Document') user = Self.getUser (owner_username) if user is None: raise 'InvalidInput', 'Unknown user %s' % owner_username try: db = user.aq_inner.aq_parent except: db = None if db is None: user = user.__of__ (Self) assert (user.aq_inner.aq_parent) user_roles = user.getRoles () if not proxy_role in user_roles: message = "User '%s' doesn't have a role '%s'" \ % (owner_username, proxy_role) raise 'InvalidInput', message for l in ll: (oid, obj) = l if oid in cfg.MYSQL_ACT_METHODS: obj.changeOwnership (user) obj.manage_proxy ((proxy_role, )) Self.manage_role (proxy_role, [cfg.MYSQL_USER_EDIT_PERMISSION]) if REQUEST: return Globals.MessageDialog( title ='act* method setup completed', message='Owner and proxy role has been set up for act methods', action =REQUEST['URL1']+'/manage_advanced', target ='manage_main') def manage_addMySQLRoles (Self, REQUEST=None): """ Add mysql roles to the parent folder. """ parent = Self.aq_parent Self._addRoles (parent) if REQUEST: return Globals.MessageDialog( title ='MySQL roles Added', message='MySQL roles have been added', action =REQUEST['URL1']+'/manage_advanced', target ='manage_main') def manage_edit (Self, realm, use_cookies, server, port, socket, database, username, password, anon_create_ok, anon_create_role, use_sessions, cookie_path, cookie_domain, cookie_use_realm, disable_cookies_for_ports, default_password_type, REQUEST=None): """ Change properties. Property dtml_user_dir cannot be changed here since it is used only when new folder is created. """ if not cookie_path: cookie_path = '/' Self.__disconnect () Self.mysql_server = server Self.mysql_port = port Self.mysql_socket = string.strip (socket) Self.mysql_db = database Self.mysql_user = username Self.mysql_password = password Self.anon_create_ok = anon_create_ok Self.anon_create_role = anon_create_role Self.realm = realm Self.use_cookies = use_cookies Self.use_sessions = use_sessions Self.default_password_type = default_password_type Self.cookie_path = cookie_path Self.cookie_domain = cookie_domain Self.cookie_use_realm = cookie_use_realm try: Self.__connect () except: pass Self._setupDisabledPorts (disable_cookies_for_ports) if REQUEST: return Globals.MessageDialog( title ='mysqlUserFolder Changed', message='mysqlUserFolder properties have been updated', action =REQUEST['URL1']+'/manage_properties', target ='manage_main') def manage_edit_parameters (Self, user_token_timeout, user_token_life, user_token_persistent, session_token_timeout, session_token_life, session_token_persistent, session_log_simple, session_log_full, REQUEST): """ Change mysqlUserFolder parameters. """ Self.user_token_timeout = user_token_timeout Self.user_token_life = user_token_life Self.user_token_persistent = user_token_persistent Self.session_token_timeout = session_token_timeout Self.session_token_life = session_token_life Self.session_token_persistent = session_token_persistent Self.session_log_simple = session_log_simple Self.session_log_full = session_log_full if REQUEST: return Globals.MessageDialog( title ='mysqlUserFolder Changed', message='mysqlUserFolder parametes have been updated', action =REQUEST['URL1']+'/manage_parameters', target ='manage_main') def manage_connect (Self, REQUEST): """ Reconnects to server """ Self.__disconnect () Self.__connect () if REQUEST: return Globals.MessageDialog( title ='mysqlUserFolder Reconnected', message='mysqlUserFolder is reconnected to server', action =REQUEST['URL1']+'/manage_properties', target ='manage_main') def manage_users_create (Self, username, password, password_type, role = [], realname= '', email='', domains=[], REQUEST=None): """ Creates new user (managment). Parameter role can be typle, list or single role. This parameter is not called roles because of backward compatibility. """ Self.__check_connection () if type(role) == type((None,)): roles = list(role) elif type (role) == type ([]): roles = role else: roles = [role] ok = Self.create_user (username, password, password_type, roles, realname, email, domains) if not ok: if REQUEST: return Globals.MessageDialog( title ='User exists', message='Error: Specified user already exists !', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 if REQUEST: return Globals.MessageDialog( title ='User created', message='Specified user is created.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 1 def manage_users_change_roles (Self, REQUEST, username, roles = []): """ Changes user roles (managment) """ if type (roles) != type ([]): roles = [roles] Self.__check_connection () user = Self.getUser (username) if not user: if REQUEST: return Globals.MessageDialog( title ='User does not exist', message='Error: Specified user does not exist.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 ret = Self._v_user_db.change_user_roles (user, roles) if not ret: if REQUEST: return Globals.MessageDialog( title ='DB error', message='Error: DB error.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 if REQUEST: return Self.manage_dialog ( title ='User roles changed', message='User roles are changed.', action =REQUEST['URL1']+'/manage_users_edit', target ='manage_main', hfields = { 'username': username }, ) else: return 1 def manage_users_change_other (Self, REQUEST, username, realname, email): """ Changes user data (managment) """ Self.__check_connection () user = Self.getUser (username) if not user: if REQUEST: return Globals.MessageDialog( title ='User does not exist', message='Error: Specified user does not exist.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 if realname == '': realname = user.getRealName () if email == '': email = user.getEMail () Self._v_user_db.change_user_other (user, realname, email) if REQUEST: return Self.manage_dialog ( title ='User changed', message='User data are changed.', action =REQUEST['URL1']+'/manage_users_edit', hfields = { 'username': username }, target ='manage_main') else: return 1 def manage_users_change_password (Self, REQUEST, username, password, password_type): """ Changes user password (managment) """ Self.__check_connection () user = Self.getUser (username) if not user: if REQUEST: return Globals.MessageDialog( title ='User does not exist', message='Error: Specified user does not exist.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 Self._v_user_db.change_user_password (user, password, password_type) if REQUEST: return Self.manage_dialog ( title ='User changed', message='User password is changed.', action =REQUEST['URL1']+'/manage_users_edit', hfields = { 'username': username }, target ='manage_main') else: return 1 def manage_users_change_domains (Self, REQUEST, username, domains): """ Changes user domains (managment) """ Self.__check_connection () if type (domains) == type (""): domains = string.split (domains) if not Self.domainSpecValidate (domains): if REQUEST: return Globals.MessageDialog( title ='Invalid domain spec', message='Domain specification is invalid', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 user = Self.getUser (username) if not user: if REQUEST: return Globals.MessageDialog( title ='User does not exist', message='Error: Specified user does not exist.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 Self._v_user_db.change_user_domains (user, domains) if REQUEST: return Self.manage_dialog ( title ='User changed', message='User domains are changed.', action =REQUEST['URL1']+'/manage_users_edit', hfields = { 'username': username }, target ='manage_main') else: return 1 def manage_users_delete (Self, REQUEST, username): """ Deletes user (managment) """ Self.__check_connection () ok = Self._v_user_db.del_user (username) if not ok: if REQUEST: return Globals.MessageDialog( title ='Error deleting user', message='Specified user is not deleted.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 if REQUEST: return Globals.MessageDialog( title ='User deleted', message='Specified user is deleted.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 1 def manage_role_create (Self, role, REQUEST = None): """ Creates role (managment) """ Self.__check_connection () ok = Self._v_user_db.create_role (role) parent = Self.aq_parent try: Self._addRoles (parent) except AttributeError: pass if not ok: if REQUEST: return Globals.MessageDialog( title ='Error creating role', message='Specified role is not created.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 if REQUEST: return Globals.MessageDialog( title ='Role created', message='Specified role is created.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 1 def manage_role_delete (Self, role, REQUEST = None): """ Deletes role (managment) """ Self.__check_connection () ok = Self._v_user_db.del_role (role) parent = Self.aq_parent Self._delRoles (parent, [role]) if not ok: if REQUEST: return Globals.MessageDialog( title ='Error deleting role', message='Specified role is not deleted.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 0 if REQUEST: return Globals.MessageDialog( title ='Role deleted', message='Specified role is deleted.', action =REQUEST['URL1']+'/manage_users', target ='manage_main') else: return 1 # ------------------------ # "user" methods # # All functions besides user_login () and user_logout () are protected by # MYSQL_USER_EDIT_PERMISSION permission. Function user_login () is available # to everyone, and user_logout () is available to all mysqlUF # authenticated users. user_login__roles__ = None def user_login (Self, REQUEST, RESPONSE, username, password, create_global = 0, login_goto_path = "docUserPage"): """ Authenticates user """ Self.__check_connection () if not Self.use_cookies: raise 'Forbidden', \ "You are not authorized to access this document " \ "(cookies are disabled)." util.log_debug ("user_login (): begin (%s)" % username) user = Self.authenticate_user_pass (username, password, REQUEST) if not user: if Self.use_sessions: if Self.session_log_simple: session = Self.__get_session_from_request (REQUEST) addr = util.get_address_from_request (REQUEST) # session can be None log.session_log_login_failed (Self._v_db, Self.realm, session, username, password, addr) util.log_debug ("user_login (): user not authentificated.") REQUEST ['login_goto'] = login_goto_path return Self.docLogin (Self, REQUEST, login_failed = 1) session = None if Self.use_sessions: session = Self.__get_session_from_request (REQUEST) if session: if not session.isAnonymousUser (): Self.__terminate_session (REQUEST, RESPONSE, session) session = None util.log_debug ("user_login (): user authentificated.") Self._v_token_db.request_assign_new_token (RESPONSE, 'user', user.getDBId (), Self.user_token_timeout, Self.user_token_life, Self.user_token_persistent) if session: assert (session.getUserId () == cfg.VDB_INVALID_ID) if Self.session_log_simple: addr = util.get_address_from_request (REQUEST) log.session_log_login (Self._v_db, Self.realm, session, username, addr) if login_goto_path: RESPONSE.redirect (login_goto_path) user_logout__roles__ = [cfg.MYSQL_USER_ROLE] def user_logout (Self, REQUEST, RESPONSE, logout_goto_path = "/"): """ Terminates user session """ Self.__check_connection () if not Self.use_cookies: raise 'Forbidden', \ "You are not authorized to access this document " \ "(cookies are disabled)." user = Self.check_valid_mysql_user (REQUEST) if Self.use_sessions: session = Self.__get_session_from_request (REQUEST) if session: Self.__terminate_session (REQUEST, RESPONSE, session) Self._v_token_db.request_delete_token (RESPONSE, 'user', user.getDBId ()) if logout_goto_path: return RESPONSE.redirect (logout_goto_path) # Protected by MYSQL_USER_EDIT_PERMISSION. def user_change_other_data (Self, REQUEST, realname, email): """ Change user's other data """ Self.__check_connection () user = Self.check_valid_mysql_user (REQUEST) if (user.getRealName () == realname) and (user.getEMail () == email): return 1 ret = Self._v_user_db.change_user_other (user, realname, email) return ret # Protected by MYSQL_USER_EDIT_PERMISSION. def user_change_password (Self, REQUEST, new_pass): """ Changes user's password """ Self.__check_connection () user = Self.check_valid_mysql_user (REQUEST) ret = Self._v_user_db.change_user_password (user, new_pass) return ret # Protected by MYSQL_USER_EDIT_PERMISSION. def user_create (Self, REQUEST, username, realname, email, password): """ Anonymous creation """ Self.__check_connection () if not Self.anon_create_ok: raise 'Forbidden', \ "You are not authorized to access this document." if Self.anon_create_role: roles = [Self.anon_create_role] else: roles = [] ok = Self.create_user (username, password, Self.default_password_type, roles, realname, email) return ok Globals.default__class_init__ (mysqlUserFolder)