# -*- encoding: iso-8859-1 -*- ############################################################################## # # Copyright (c) 2004 TINY SPRL. (http://tiny.be) All Rights Reserved. # # $Id$ # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsability of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # garantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## # . Fields: # - simple # - relations (one2many, many2one, many2many) # - function # # Fields Attributes: # _classic_read: is a classic sql fields # _type : field type # readonly # required # size # import string import netsvc import psycopg def _symbol_set(symb): if symb==None or symb==False: return None elif isinstance(symb, unicode): return symb.encode('utf-8') return str(symb) class _column(object): _classic_read = True _classic_write = True _properties = False _type ='unknown' _symbol_c = '%s' _symbol_f = _symbol_set _obj = None _symbol_set = (_symbol_c,_symbol_f) def __init__(self, string='unknown', required=False, readonly=False, domain=[], context='', states={}, priority=0, change_default=False, size=None, ondelete="set null", translate=False, **args): self.states=states self.string=string self.readonly=readonly self.required=required self.size=size self.priority = priority self.change_default = change_default self.ondelete=ondelete self.translate = translate self._domain = domain self.relate=False self._context = context for a in args: if args[a]: setattr(self, a, args[a]) def set(self, cr, obj, id, name, value, user=None): cr.execute('update '+obj._table+' set '+name+'='+self._symbol_set[0]+' where id=%d', (self._symbol_set[1](value),id) ) def get(self, cr, obj, ids, name, context={}): raise 'undefined get method !' def search(self, cr, obj, args, name, value, offset=0, limit=2000, uid=None): ids = obj.search(cr, uid, args+self._domain+[(name,'ilike',value)]) res = obj.read(cr, uid, ids, [name]) return [x[name] for x in res] # --------------------------------------------------------- # Simple fields # --------------------------------------------------------- def _symbol_f(val): if val: return 'True' else: return 'False' class boolean(_column): _type = 'boolean' _symbol_c = '%s' _symbol_set = (_symbol_c,_symbol_f) class integer(_column): _type = 'integer' _symbol_f = lambda x: int(x or 0) _symbol_c = '%d' _symbol_set = (_symbol_c,_symbol_f) class reference(_column): _type = 'reference' def __init__(self, string, selection, size, **args): _column.__init__(self, string=string, size=size, selection=selection, **args) class char(_column): _type = 'char' def __init__(self, string, size, **args): _column.__init__(self, string=string, size=size, **args) self._symbol_set = (self._symbol_c, self._symbol_set_char) # takes a string (encoded in utf8) and returns a string (encoded in utf8) def _symbol_set_char(self, symb): #TODO: # * we need to remove the "symb==False" from the next line BUT # for now too many things rely on this broken behavior # * the symb==None test should be common to all data types if symb==None or symb==False: return None # we need to convert the string to a unicode object to be able # to evaluate its length (and possibly truncate it) reliably if isinstance(symb, str): u_symb = unicode(symb, 'utf8') elif isinstance(symb, unicode): u_symb = symb else: u_symb = unicode(symb) if len(u_symb) > self.size: return u_symb[:self.size-3].encode('utf8') + '...' else: return u_symb.encode('utf8') class text(_column): _type = 'text' import __builtin__ class float(_column): _type = 'float' _symbol_f = lambda x: __builtin__.float(x or 0.0) _symbol_c = '%f' _symbol_set = (_symbol_c,_symbol_f) def __init__(self, string='unknown', digits=None, **args): _column.__init__(self, string=string, **args) self.digits = digits class date(_column): _type = 'date' class datetime(_column): _type = 'datetime' class time(_column): _type = 'time' def _symb_set_binary(symb): if symb==None or symb==False: return None else: return psycopg.Binary(symb) class binary(_column): _type = 'binary' _symbol_set = ('%s', _symb_set_binary) class selection(_column): _type = 'selection' def __init__(self, selection, string='unknown', **args): _column.__init__(self, string=string, **args) self.selection = selection def set(self, cr, obj, id, name, value, user=None): if value in self.selection: raise 'BAD VALUE' _column.set(self, cr, obj, id, name, value, user=None) # --------------------------------------------------------- # Relationals fields # --------------------------------------------------------- # # Values: (0, 0, { fields }) create # (1, ID, { fields }) modification # (2, ID) remove (delete) # (3, ID) unlink one (target id or target of relation) # (4, ID) link # (5) unlink all (only valid for one2many) # #CHECKME: dans la pratique c'est quoi la syntaxe utilisee pour le 5? (5) ou (5, 0)? class one2one(_column): _classic_read = False _classic_write = True _type = 'one2one' def __init__(self, obj, string='unknown', **args): _column.__init__(self, string=string, **args) self._obj = obj def set(self, cr, obj_src, id, field, act, user=None): obj = obj_src.pool.get(self._obj) self._table = obj_src.pool.get(self._obj)._table if act[0]==0: id_new = obj.create(cr, user, act[1]) cr.execute('update '+obj_src._table+' set '+field+'=%d where id=%d', (id_new,id)) else: cr.execute('select '+field+' from '+obj_src._table+' where id=%d', (act[0],)) id =cr.fetchone()[0] obj.write(cr, user, [id] , act[1]) def search(self, cr, obj, args, name, value, offset=0, limit=2000, uid=None): return obj.pool.get(self._obj).search(cr, uid, args+self._domain+[('name','like',value)]) class many2one(_column): _classic_read = False _classic_write = True _type = 'many2one' def __init__(self, obj, string='unknown', **args): _column.__init__(self, string=string, **args) self._obj = obj # # TODO: speed improvement # def get(self, cr, obj, ids, name, user=None, context={}): cr.execute('select id,'+name+' from '+obj._table+' where id in ('+','.join(map(str,ids))+')') res = dict(cr.fetchall()) obj = obj.pool.get(self._obj) names = dict(obj.name_get(cr, user, filter(lambda x:x, res.values()))) for r in res.keys(): if res[r]: res[r] = (res[r],names.get(res[r], False)) return res def set(self, cr, obj_src, id, field, values, user=None): obj = obj_src.pool.get(self._obj) self._table = obj_src.pool.get(self._obj)._table if type(values)==type([]): for act in values: if act[0]==0: id_new = obj.create(cr, act[2]) cr.execute('update '+obj_src._table+' set '+field+'=%d where id=%d', (id_new,id)) elif act[0]==1: obj.write(cr, [act[1]], act[2]) elif act[0]==2: cr.execute('delete from '+self._table+' where id=%d', (act[1],)) elif act[0]==3 or act[0]==5: cr.execute('update '+obj_src._table+' set '+field+'=null where id=%d', (id,)) elif act[0]==4: cr.execute('update '+obj_src._table+' set '+field+'=%d where id=%d', (act[1],id)) else: if values: cr.execute('update '+obj_src._table+' set '+field+'=%d where id=%d', (values,id)) else: cr.execute('update '+obj_src._table+' set '+field+'=null where id=%d', (id,)) def search(self, cr, obj, args, name, value, offset=0, limit=2000, uid=None): return obj.pool.get(self._obj).search(cr, uid, args+self._domain+[('name','like',value)]) class one2many(_column): _classic_read = False _classic_write = False _type = 'one2many' def __init__(self, obj, fields_id, string='unknown', limit=5000, **args): _column.__init__(self, string=string, **args) self._obj = obj self._fields_id = fields_id self._limit = limit #one2many can't be used as condition for defaults assert(self.change_default != True) # # TODO: speed improvement # def get(self, cr, obj, ids, name, user=None, offset=0, context={}): res = {} for id in ids: res[id] = [] ids2 = obj.pool.get(self._obj).search(cr, user, [(self._fields_id,'in',ids)]) for r in obj.pool.get(self._obj).read(cr, user, ids2, [self._fields_id]): res[r[self._fields_id][0]].append( r['id'] ) return res def set(self, cr, obj, id, field, values, user=None): _table = obj.pool.get(self._obj)._table obj = obj.pool.get(self._obj) for act in values: if act[0]==0: act[2][self._fields_id] = id obj.create(cr, user, act[2]) elif act[0]==1: obj.write(cr, user, [act[1]] , act[2]) elif act[0]==2: obj.unlink(cr, user, [act[1]]) elif act[0]==3: cr.execute('update '+_table+' set '+self._fields_id+'=null where id=%d', (act[1],)) elif act[0]==4: cr.execute('update '+_table+' set '+self._fields_id+'=%d where id=%d', (id,act[1])) elif act[0]==5: cr.execute('update '+_table+' set '+self._fields_id+'=null where '+self._fields_id+'=%d', (id,)) elif act[0]==6: # # TODO: change this in one line add remove # for act_nbr in act[2]: cr.execute('update '+_table+' set '+self._fields_id+'=%d where id=%d', (id,act_nbr)) def search(self, cr, obj, args, name, value, offset=0, limit=2000, uid=None, operator='like'): return obj.pool.get(self._obj).name_search(cr, uid, value, self._domain) # # Values: (0, 0, { fields }) create # (1, ID, { fields }) modification # (2, ID) remove # (3, ID) unlink # (4, ID) link # (5, ID) unlink all # (6, ?, ids) set a list of links # class many2many(_column): _classic_read = False _classic_write = False _type = 'many2many' def __init__(self, obj, rel, id1, id2, string='unknown', limit=5000, **args): _column.__init__(self, string=string, **args) self._obj = obj self._rel = rel self._id1 = id1 self._id2 = id2 self._limit = limit def get(self, cr, obj, ids, name, user=None, offset=0, context={}): res = {} if not ids: return res ids_s = ','.join(map(str,ids)) cr.execute('select '+self._id2+','+self._id1+' from '+self._rel+' where '+self._id1+' in ('+ids_s+') limit %d offset %d', (self._limit,offset)) for id in ids: res[id] = [] for r in cr.fetchall(): res[r[1]].append(r[0]) return res def set(self, cr, obj, id, name, values, user=None): obj = obj.pool.get(self._obj) for act in values: if act[0]==0: idnew = obj.create(cr, user, act[2]) cr.execute('insert into '+self._rel+' ('+self._id1+','+self._id2+') values (%d,%d)', (id,idnew)) elif act[0]==1: obj.write(cr, user, [act[1]] , act[2]) elif act[0]==2: obj.unlink(cr, user, [act[1]]) elif act[0]==3: cr.execute('update '+self._rel+' set '+self._id2+'=null where ' + self._id1 + '=%d', (act[1],)) elif act[0]==4: cr.execute('update '+self._rel+' set '+self._id2+'=%d where ' + self._id1 + '=%d', (id,act[1])) elif act[0]==5: cr.execute('update '+self._rel+' set '+self._id2+'=null where '+self._id2+'=%d', (id,)) elif act[0]==6: cr.execute('delete from '+self._rel+' where '+self._id1+'=%d', (id, )) for act_nbr in act[2]: cr.execute('insert into '+self._rel+' ('+self._id1+','+self._id2+') values (%d, %d)', (id, act_nbr)) # # TODO: use a name_search # def search(self, cr, obj, args, name, value, offset=0, limit=2000, uid=None, operator='like'): return obj.pool.get(self._obj).search(cr, uid, args+self._domain+[('name',operator,value)]) # --------------------------------------------------------- # Function fields # --------------------------------------------------------- class function(_column): _classic_read = False _classic_write = False _type = 'function' _properties = True def __init__(self, fnct, arg=None, fnct_inv=None, fnct_inv_arg=None, type='float', fnct_search=None, obj=None, method=False, **args): _column.__init__(self, **args) self._obj = obj self._method = method self._fnct = fnct self._fnct_inv = fnct_inv self._arg = arg self._fnct_inv_arg = fnct_inv_arg if not fnct_inv: self.readonly = 1 self._type = type self._fnct_search = fnct_search def search(self, cr, uid, obj, name, args): if not self._fnct_search: return [] return self._fnct_search(obj, cr, uid, obj, name, args) def get(self, cr, obj, ids, name, user=None, context={}): res = {} table = obj._table if self._method: # TODO get HAS to receive uid for permissions ! return self._fnct(obj, cr, user, ids, name, self._arg, context) else: return self._fnct(cr, table, ids, name, self._arg, context) def set(self, cr, obj, id, name, value, user=None): if self._fnct_inv: table = obj._table self._fnct_inv(cr, table, id, name, value, self._fnct_inv_arg)