""" Database connection """ __version__ = "$Rev: 71 $" # $URL: svn://localhost/mysqlUserFolder/trunk/db.py $ # # $LastChangedBy: vladap $ # $LastChangedDate: 2005-01-10 10:35:15 +0200 (Mon, 10 Jan 2005) $ import string import MySQLdb import MySQLdb.constants.CR import util, cfg DB_HOSED_CONNECTION = ( MySQLdb.constants.CR.SERVER_GONE_ERROR, MySQLdb.constants.CR.SERVER_LOST ) # --------------------------------------------------------------------- */ # Utility functions def setup_user_roles (roles): roles.append (cfg.MYSQL_USER_ROLE) roles.append (cfg.ZOPE_ANONYMOUS_ROLE) roles.append (cfg.ZOPE_AUTHENTICATED_ROLE) iroles = cfg.VALIDATE_IGNORE_ROLES if len (iroles) > 0: new_roles = filter (lambda x, iroles=iroles: not (x in iroles), roles) else: new_roles = roles return new_roles # --------------------------------------------------------------------- */ # dbConnection class class dbConnection: """ DB Connection class. It is supposed to be refferenced as _v_ attribute of all classes that need access to a database. So this class does not use locks. dbConnection methods will lock MySQL tables in case they need to access more than one table. Because MySQL does not support transactions loosing connection in the middle of the transaction could create problems. """ def __init__ (Self, server, port, socket, db_name, user, password): Self.server = server Self.port = port Self.socket = socket Self.db_name = db_name Self.user = user Self.password = password Self.db_con = None if not Self.port: Self.port = 3306 Self.db_error = 0 #-------------------------------------------- # Internal general utility functions def __autoincrement_id (Self, cur): #return cur.insert_id () return cur.lastrowid def __lock_tables (Self, tb_list): l = len (tb_list) query = "lock tables " for i in range (l): (table, mode) = tb_list [i] if mode == "r": mode_str = "read" else: mode_str = "write" v = "%s %s" % (table, mode_str) query = query + v if i != (l-1): query = query + ", " Self.__do_query (query, ()) def __unlock_tables (Self): Self.__do_query ("unlock tables", ()) def __do_query (Self, query, param, many = 0): if Self.db_con is None: Self.connect () if Self.db_con is None: res = None else: res = Self.db_con.cursor () try: if not many: res.execute (query, param) else: res.executemany (query, param) except MySQLdb.MySQLError, e: if e [0] in DB_HOSED_CONNECTION: Self.connect () return Self.__do_query (query, param, many) Self.db_error = 1 util.log_error ("Database error: %s" % e) raise return res def __query_list_field (Self, query, args): """ This function returns a list of values from the select query. Result set should contain only one field. """ cur = Self.__do_query (query, args) if cur is None: return [] data = cur.fetchall () ret = [] for i in range (len (data)): (value,) = data [i] if value: ret.append (value) return ret def __query_count (Self, table, where_clause = None, args = None): """ Returns number of rows in the recordset specified by where clause. """ query = 'select count(*) from %s' % (table) if where_clause: query = '%s where %s' % (query, where_clause) cur = Self.__do_query (query, args) if cur is None: return [] data = cur.fetchall () return data [0][0] def __affected_rows (Self, cur): return cur.rowcount #-------------------------------------------- # Connect / disconnect, has_errors flag. def connect (Self): if Self.socket == '': util.log_debug ("db.connect (): using TCP.") Self.db_con = MySQLdb.connect (host = Self.server, port = Self.port, user = Self.user, passwd = Self.password, db = Self.db_name) else: util.log_debug ("db.connect (): using socket.") Self.db_con = MySQLdb.connect (host = Self.server, unix_socket = Self.socket, user = Self.user, passwd = Self.password, db = Self.db_name) Self.db_error = 0 return 1 def disconnect (Self): if not Self.db_con is None: Self.db_con.close () Self.db_con = None def has_errors (Self): return Self.db_error #-------------------------------------------- # Internal User db functions. def __check_user (Self, user): username = user.getUserName () realm = user.getRealm () id = user.getDBId () where = "Username = %s and Realm = %s and Id = %s" cnt = Self.__query_count ("Users", where, (username, realm, id)) return (cnt == 1) def __check_user_exist (Self, username, realm): where = "Username = %s and Realm = %s" cnt = Self.__query_count ("Users", where, (username, realm)) return (cnt == 1) #-------------------------------------------- # User db API functions. def list_rolenames (Self, realm): query = "select Rolename from Roles where Realm = %s order by Rolename" ret = Self.__query_list_field (query, (realm, )) return ret def list_usernames (Self, realm): query = "select Username from Users where Realm = %s order by Username" return Self.__query_list_field (query, (realm, )) def list_usernames_with_role (Self, realm, role): Self.__lock_tables ([('Users', 'r'), ('Roles', 'r'), ('UserRoles', 'r')]) query = "Select id from Roles where Realm = %s and Rolename = %s" [rid, ] = Self.__query_list_field (query, (realm, role)) query = "Select Users.Username from Users,UserRoles where " \ "(Users.Id = UserRoles.Id_User) and (UserRoles.Id_Role = %s)" \ "and (Users.Realm = %s) order by Username" ret = Self.__query_list_field (query, (rid, realm)) Self.__unlock_tables () return ret def get_user_data (Self, user, by_id = 0): username = user.getUserName () realm = user.getRealm () if by_id: uid = user.getDBId () user._setDBId (cfg.VDB_INVALID_ID) user._setPassword ('', 0) user._setAuthData ([], []) user._setOtherData ('', '') # ----- Trazenje usera Self.__lock_tables ([("Users", "r"), ("Roles", "r"), ("UserRoles", "r")]) if by_id: query = "Select Username, Realm from Users where Id = %s" cur = Self.__do_query (query, (uid, )) if not cur: Self.__unlock_tables () util.log_debug ("dbConn.get_user_data (): no cursor") return 0 data = cur.fetchall () if len (data) == 0: Self.__unlock_tables () util.log_debug ("dbConn.get_user_data (): " "no user data by id: '%i'" % uid) return 0 (du, dr) = data [0] username = du user._setUserName (username) if dr != realm: Self.__unlock_tables () util.log_debug ("dbConn.get_user_data (): " "invalid realm for user id") return 0 query = "Select Id, Password, PasswordType, EMail, Realname, " \ "Domains from Users where Realm = %s and Username = %s" cur = Self.__do_query (query, (realm, username)) if cur is None: Self.__unlock_tables () util.log_debug ("dbConn.get_user_data (): no cursor") return 0 data = cur.fetchall () if len (data) == 0: Self.__unlock_tables () util.log_debug ("dbConn.get_user_data (): no user data") return 0 (id, password, password_type, email, realname, domains_str) = data [0] if domains_str is None: domains_str = '' domains = string.split (string.strip (domains_str)) # ----- Find roles query = 'Select Id_Role from UserRoles where Id_User = %s' role_ids = Self.__query_list_field (query, (id, )) roles = [] for role_id in role_ids: query = "Select Rolename from Roles where Id = %s" (role,) = Self.__query_list_field (query, (role_id, )) roles.append (role) roles = setup_user_roles (roles) # ----- Popunjavanje strukture user._setDBId (id) user._setOtherData (realname, email) user._setAuthData (roles, domains) user._setPassword (password, password_type) Self.__unlock_tables () return 1 def count_users (Self, realm): where = "Realm = %s" table = "Users" return Self.__query_count (table, where, (realm, )) def count_roles (Self, realm): where = "Realm = %s" table = "Roles" return Self.__query_count (table, where, (realm, )) def change_user_password (Self, user, password, ptype): Self.__lock_tables ([("Users", "w")]) if not Self.__check_user (user): Self.__unlock_tables () return 0 id = user.getDBId () query = "update Users set " \ "Password = %s, PasswordType = %s where Id = %s" cur = Self.__do_query (query, (password, ptype, id)) if not cur: ret = 0 else: ret = (Self.__affected_rows (cur) == 1) Self.__unlock_tables () return ret def change_user_roles (Self, user, roles): Self.__lock_tables ([("Users", "r"), ("Roles", "r"), ("UserRoles", "w")]) if not Self.__check_user (user): Self.__unlock_tables () return 0 user_id = user.getDBId () realm = user.getRealm () query = "delete from UserRoles where Id_User = %s" Self.__do_query (query, (user_id,)) for role in roles: query = \ "Select Id from Roles where Rolename = %s and Realm = %s" ret = Self.__query_list_field (query, (role, realm)) if ret != []: [role_id] = ret query = \ "insert into UserRoles (Id_Role, Id_User) Values (%s, %s)" Self.__do_query (query, (role_id, user_id)) else: Self.__unlock_tables () raise 'Unknown role: %s' % (role) Self.__unlock_tables () return 1 def change_user_domains (Self, user, domains): if (type (domains) == type ([])) or (type (domains) == type (())): domains = string.join (domains) Self.__lock_tables ([("Users", "w")]) if not Self.__check_user (user): Self.__unlock_tables () return 0 id = user.getDBId () query = "update Users set Domains = %s where Id = %s" cur = Self.__do_query (query, (domains, id)) if not cur: ret = 0 else: ret = (Self.__affected_rows (cur) == 1) Self.__unlock_tables () return ret def change_user_other (Self, user, realname, email): Self.__lock_tables ([("Users", "w")]) if not Self.__check_user (user): Self.__unlock_tables () return 0 id = user.getDBId () query = "update Users set " \ "Realname = %s, EMail = %s where Id = %s" cur = Self.__do_query (query, (realname, email, id)) if not cur: ret = 0 else: ret = (Self.__affected_rows (cur) == 1) Self.__unlock_tables () return ret def create_user (Self, username, realm, roles): Self.__lock_tables ([("Users", "w"), ("Roles", "r"), ("UserRoles", "w")]) if Self.__check_user_exist (username, realm): Self.__unlock_tables () return 0 query = "insert into Users (Username, Realm, Password, " \ "PasswordType) values (%s, %s, %s, %s)" cur = Self.__do_query (query, (username, realm, '', cfg.VDB_INVALID_PASSWORD_TYPE)) user_id = int (Self.__autoincrement_id (cur)) for role in roles: query = \ "Select Id from Roles where Rolename = %s and Realm = %s" ret = Self.__query_list_field (query, (role, realm)) if ret != []: [role_id] = ret query = \ "insert into UserRoles (Id_Role, Id_User) Values (%s, %s)" Self.__do_query (query, (role_id, user_id)) else: Self.__unlock_tables () Self.del_user (username, realm) raise 'Unknown role: (%s, %s)' % (role, realm) Self.__unlock_tables () return 1 def del_user (Self, username, realm, mclass = None): Self.__lock_tables ([("Users", "w"), ("UserRoles", "w"), ("MiscData", "w"), ("Sessions", "w"), ("Tokens", "w")]) query = "select Id from Users where Username = %s and Realm = %s" ret = Self.__query_list_field (query, (username, realm)) if ret == []: Self.__unlock_tables () return 0 [id] = ret query = "select Id from Sessions where Id_User = %s" sids = Self.__query_list_field (query, (id, )) for sid in sids: Self.del_token ('session', sid, realm) Self.del_token ('user', id, realm) query = "delete from Sessions where Id_User = %s" Self.__do_query (query, (id, )) query = "delete from UserRoles where Id_User = %s" Self.__do_query (query, (id, )) query = "delete from Users where Id = %s" Self.__do_query (query, (id, )) if mclass: Self.del_misc_data_class (mclass, id) Self.__unlock_tables () return 1 def create_role (Self, role, realm): query = "insert into Roles (Rolename, Realm) values (%s, %s)" Self.__do_query (query, (role, realm)) return 1 def del_role (Self, role, realm): Self.__lock_tables ([('Roles', 'w'), ('UserRoles', 'w')]) query = "Select Id from Roles where Rolename = %s and Realm = %s" ret = Self.__query_list_field (query, (role, realm)) if ret == []: Self.__unlock_tables () return 0 [role_id] = ret query = "delete from Roles where Id = %s" Self.__do_query (query, (role_id,)) query = "delete from UserRoles where Id_Role = %s" Self.__do_query (query, (role_id,)) Self.__unlock_tables () return 1 #-------------------------------------------- # Session db API functions def create_session (Self, session): user_id = session.getUserId () query = "insert into Sessions (id_user) values (%s)" cur = Self.__do_query (query, (user_id, )) id = int (Self.__autoincrement_id (cur)) session._setId (id) def get_session (Self, sess): id = sess.getId () query = "select Id_User from Sessions where Id = %s" res = Self.__query_list_field (query, (id, )) if not res: sess._setId (cfg.VDB_INVALID_ID) sess._setUserId (cfg.VDB_INVALID_ID) return sess assert (len (res) == 1) sess._setUserId (res [0]) def session_set_user_id (Self, session, user_id): query = "update Sessions set Id_User = %s where Id = %s" Self.__do_query (query, (user_id, session.getId ())) #-------------------------------------------- # Token db API functions def del_token (Self, class_id, token_id, realm): query = "delete from Tokens where Class = %s " \ "and Id = %s and Realm = %s" Self.__do_query (query, (class_id, token_id, realm)) def create_token (Self, class_id, token_id, token_value, token_timeout, token_life, created_time, accessed_time, realm): Self.__lock_tables ([('Tokens', 'w')]) Self.del_token (class_id, token_id, realm) query = "insert into Tokens (Class, Id, Value, Created, " \ "Accessed, Timeout, Life, Realm) values " \ "(%s, %s, %s, %s, %s, %s, %s, %s)" values = (class_id, token_id, token_value, created_time, accessed_time, token_timeout, token_life, realm) Self.__do_query (query,values) Self.__unlock_tables () def get_token (Self, class_id, token_id, realm): query = "select Value, Created, Accessed, Timeout, Life " \ "from Tokens where Class = %s " \ "and Id = %s and Realm = %s" cur = Self.__do_query (query, (class_id, token_id, realm)) if not cur: return None data = cur.fetchall () if len (data) == 0: return None assert (len (data) == 1) return data [0] def update_token_accessed (Self, class_id, token_id, time, realm): query = "update Tokens set Accessed = %s where Class = %s " \ "and Id = %s and Realm = %s" Self.__do_query (query, (time, class_id, token_id, realm)) #-------------------------------------------- # MiscData API functions def save_misc_data (Self, obj_class, obj_id, misc_id, misc_tuple): if type (misc_tuple) != type (()): raise 'API Error', \ "save_misc_data () should be called with tuple. " (iv, sv) = misc_tuple Self.__lock_tables ([('MiscData', "w")]) data = Self.get_misc_data (obj_class, obj_id, misc_id, check=1) if data: query = "update MiscData set " \ "Value_int = %s, Value_string = %s where " \ "OId = %s and OClass = %s and MId = %s" Self.__do_query (query, (iv, sv, obj_id, obj_class, misc_id)) else: query = "insert into MiscData (OId, OClass, MId, " \ "Value_int, Value_string) values (%s, %s, %s, %s, %s)" Self.__do_query (query, (obj_id, obj_class, misc_id, iv, sv)) Self.__unlock_tables () def get_misc_data (Self, oclass, oid, mid, check = 0): query = "select Value_int, Value_string from MiscData where " \ "OId = %s and OClass = %s and MId = %s" cur = Self.__do_query (query, (oid, oclass, mid)) if not cur: if not check: return cfg.MISC_DATA_EMPTY else: return 0 data = cur.fetchall () if not data: if not check: return cfg.MISC_DATA_EMPTY else: return 0 assert (len (data) == 1) r = data [0] if check: return 1 return r def del_misc_data (Self, oclass, oid, mid): query = "delete from MiscData where " \ "OId = %s and OClass = %s and MId = %s" Self.__do_query (query, (oid, oclass, mid)) def del_misc_data_class (Self, oclass, oid): query = "delete from MiscData where OId = %s and OClass = %s" Self.__do_query (query, (oid, oclass)) #-------------------------------------------- # Session logging def log_sql_simple (Self, realm, sid, event): query = "insert delayed into Log (Realm, Id_Session, Event) " \ "values (%s, %s, %s)" Self.__do_query (query, (realm, sid, event)) def log_sql (Self, realm, sid, event, e_value, e_value2, o_type, o_size, r_addr): query = "insert delayed into Log (Realm, Id_Session, Event, " \ "EValue, EValue2, OType, OSize, RAddr) values " \ "(%s, %s, %s, %s, %s, %s, %s, %s)" Self.__do_query (query, (realm, sid, event, e_value, e_value2, o_type, o_size, r_addr))