# -*- encoding: utf-8 -*- ############################################################################## # # Copyright (c) 2004 TINY SPRL. (http://tiny.be) All Rights Reserved. # # $Id: account.py 1005 2005-07-25 08:41:42Z nicoe $ # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsability of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # garantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import time import netsvc from osv import fields, osv import ir class account_invoice(osv.osv): def _amount_untaxed(self, cr, uid, ids, prop, unknow_none,unknow_dict): id_set=",".join(map(str,ids)) cr.execute("SELECT s.id,COALESCE(SUM(l.price_unit*l.quantity),0) AS amount FROM account_invoice s LEFT OUTER JOIN account_invoice_line l ON (s.id=l.invoice_id) WHERE s.id IN ("+id_set+") GROUP BY s.id ") res=dict(cr.fetchall()) return res def _amount_tax(self, cr, uid, ids, prop, unknow_none,unknow_dict): id_set=",".join(map(str,ids)) cr.execute("SELECT s.id,COALESCE(SUM(l.amount),0) AS amount FROM account_invoice s LEFT OUTER JOIN account_invoice_tax l ON (s.id=l.invoice_id) WHERE s.id IN ("+id_set+") GROUP BY s.id ") res=dict(cr.fetchall()) return res def _amount_total(self, cr, uid, ids, prop, unknow_none,unknow_dict): untax = self._amount_untaxed(cr, uid, ids, prop, unknow_none,unknow_dict) tax = self._amount_tax(cr, uid, ids, prop, unknow_none,unknow_dict) res = {} for id in ids: res[id] = untax.get(id,0.0) + tax.get(id,0.0) return res def _get_journal(self, cr, uid, context): type_inv = context.get('type', 'out_invoice') type2journal = {'out_invoice' : 'sale', 'in_invoice' : 'expense', 'out_refund' : 'expense', 'in_refund' : 'sale'} cr.execute("select id from account_journal where type=%s limit 1", (type2journal.get(type_inv, 'sale'),)) return cr.fetchone()[0] _name = "account.invoice" _description = 'Invoices' _order = "number" _columns = { 'name': fields.char('Invoice Description', size=64, required=True), 'origin': fields.char('Origin', size=64), 'type': fields.selection([ ('out_invoice','Customer Invoice'), ('in_invoice','Supplier Invoice'), ('out_refund','Customer Refund'), ('in_refund','Supplier Refund'), ],'Type', readonly=True, states={'draft':[('readonly',False)]}), 'number': fields.char('Invoice Number', size=32, readonly=True), 'reference': fields.char('Invoice Reference', size=64), 'project_id':fields.many2one('account.project', 'Profit/Cost Ctr', readonly=True, states={'draft':[('readonly',False)]}), 'comment': fields.text('Additionnal Information'), 'state': fields.selection([ ('draft','Draft'), ('proforma','Pro-forma'), ('open','Open'), ('paid','Paid'), ('cancel','Canceled') ],'State', readonly=True), 'date_invoice':fields.date('Date Invoiced', required=True, states={'open':[('readonly',True)],'close':[('readonly',True)]}), 'date_due':fields.date('Due Date', states={'open':[('readonly',True)],'close':[('readonly',True)]}), 'partner_id':fields.many2one('res.partner', 'Partner', change_default=True, readonly=True, states={'draft':[('readonly',False)]}, relate=True), 'address_contact_id':fields.many2one('res.partner.address', 'Contact Address', readonly=True, states={'draft':[('readonly',False)]}), 'address_invoice_id':fields.many2one('res.partner.address', 'Invoice Address', readonly=True, states={'draft':[('readonly',False)]}), 'partner_contact': fields.char('Partner Contact', size=64), 'partner_ref': fields.char('Partner Reference', size=64), 'payment_term': fields.many2one('account.payment.term', 'Payment Term'), 'account_id':fields.many2one('account.account', 'Dest Account', required=True), 'invoice_line': fields.one2many('account.invoice.line', 'invoice_id', 'Invoice Lines', readonly=True, states={'draft':[('readonly',False)]}), 'tax_line': fields.one2many('account.invoice.tax', 'invoice_id', 'Tax Lines', readonly=True, states={'draft':[('readonly',False)]}), 'move_id': fields.many2one('account.move', 'Invoice Movement', readonly=True), 'amount_untaxed': fields.function(_amount_untaxed, method=True, string='Untaxed Amount'), 'amount_tax': fields.function(_amount_tax, method=True, string='Tax Amount'), 'amount_total': fields.function(_amount_total, method=True, string='Total Amount'), 'journal_id' : fields.many2one('account.journal', 'Journal', required=True), } _defaults = { 'type': lambda *a: 'out_invoice', 'date_invoice': lambda *a: time.strftime('%Y-%m-%d'), 'state': lambda *a: 'draft', 'journal_id': _get_journal, } def unlink(self, cr, uid, ids): invoices = self.read(cr, uid, ids, ['state']) unlink_ids = [] for t in invoices: if t['state'] in ('draft', 'cancel'): unlink_ids.append(t['id']) else: raise osv.except_osv('Invalid action !', 'Cannot delete invoice(s) which are already opened or paid !') osv.osv.unlink(self, cr, uid, unlink_ids) return True # def get_invoice_address(self, cr, uid, ids): # res = self.pool.get('res.partner').address_get(cr, uid, [part], ['invoice']) # return [{}] def onchange_partner_id(self, cr, uid, ids, type, part): invoice_addr_id = False contact_addr_id = False opt=[('uid',str(uid))] if part: opt.insert(0,('id',part)) res = self.pool.get('res.partner').address_get(cr, uid, [part], ['contact','invoice']) contact_addr_id = res['contact'] invoice_addr_id = res['invoice'] if type in ('out_invoice','out_refund'): acc_id=ir.ir_get(cr,uid,'meta','account.receivable', [('res.partner',part)])[0][2] else: acc_id=ir.ir_get(cr,uid,'meta','account.payable', [('res.partner',part)])[0][2] result = {'value':{'address_contact_id': contact_addr_id, 'address_invoice_id': invoice_addr_id, 'account_id':acc_id }} if part: pt = self.pool.get('res.partner').browse(cr, uid, part).payment_term if pt: result['value']['payment_term'] = pt.id return result def onchange_type(self, cr, uid, ids, ftype): print ftype return {'value' : {'journal_id' : self._get_journal(cr, uid, {'type' : ftype})}} # go from canceled state to draft state def action_cancel_draft(self, cr, uid, ids, *args): self.write(cr, uid, ids, {'state':'draft'}) wf_service = netsvc.LocalService("workflow") for inv_id in ids: wf_service.trg_create(uid, 'account.invoice', inv_id, cr) return True # Workflow stuff ################# # return the id of the first move line which has the same account than the invoice # whose id is in ids def move_line_id_payment_get(self, cr, uid, ids, *args): ml = self.pool.get('account.move.line') for inv in self.read(cr, uid, ids, ['move_id','account_id']): if inv['move_id']: move_line_id = ml.search(cr, uid, [('move_id','=',inv['move_id'][0])]) for line in ml.read(cr, uid, move_line_id, ['account_id']): if line['account_id']==inv['account_id']: return (line['id'],) return False # # TODO: copy, with state='draft', number='' # def copy(): # pass # def test_paid(self, cr, uid, ids, *args): res = self.move_line_id_payment_get(cr, uid, ids) if not res: return False cr.execute('select reconcile_id from account_move_line where id=%d', (res[0],)) return bool(cr.fetchone()[0]) def action_move_create(self, cr, uid, ids, *args): id_set = ','.join([str(i) for i in ids]) cr.execute('SELECT * FROM account_invoice WHERE move_id IS NULL AND id IN ('+id_set+')') for inv in cr.dictfetchall(): # one move line per invoice line iml = self.pool.get('account.invoice.line').move_line_get(cr, uid, inv['id']) # one move line per tax line iml += self.pool.get('account.invoice.tax').move_line_get(cr, uid, inv['id']) # create one move line for the total and possibly adjust the other lines amount total = 0 for i in iml: if inv['type'] in ('out_invoice','in_refund'): total += i['price'] i['price'] = -i['price'] else: total -= i['price'] acc_id = inv['account_id'] if inv['payment_term']: totlines = self.pool.get('account.payment.term').compute(cr, uid, inv['payment_term'], total) print totlines for t in totlines: iml.append({ 'type':'dest', 'name':'Total', 'price': t[1], 'date_maturity': t[0], 'account_id':acc_id }) else: iml.append({ 'type':'dest', 'name':'Total', 'price': total, 'account_id':acc_id }) date = inv['date_invoice'] part = inv['partner_id'] line = map(lambda x:(0,0,{ 'date':date, 'date_maturity': x.get('date_maturity', False), 'partner_id':part, 'name':x['name'], 'debit':x['price']>0 and x['price'], 'credit':x['price']<0 and -x['price'], 'account_id':x['account_id'] }) ,iml) # create the move and all its lines invtype = dict(self._columns['type'].selection)[inv['type']] name = '%s, %s' % (invtype, inv['name']) move = { 'name': name, 'line_id': line, 'journal_id' : self._get_journal(cr, uid, {'type' : inv['type']})} move_id = self.pool.get('account.move').create(cr, uid, move) # make the invoice point to that move self.write(cr, uid, [inv['id']], {'move_id':move_id}) self._log_event(cr, uid, ids) return True def action_number(self, cr, uid, ids, *args): cr.execute('SELECT id,type,move_id,number,name FROM account_invoice WHERE id IN ('+','.join(map(str,ids))+')') for (id,type,move_id,number,name) in cr.fetchall(): if not number: number = self.pool.get('ir.sequence').get(cr, uid, 'account.invoice.'+type) cr.execute('UPDATE account_invoice SET number=%s WHERE id=%d', (number,id)) if move_id: invtype = dict(self._columns['type'].selection)[type] #TODO: faire le [:60] en python, pas sql name = '%s %s, %s' % (invtype, number or '', name) cr.execute('UPDATE account_move SET name=substring(%s,1,60) WHERE id=%d', (name,move_id)) return True def action_cancel(self, cr, uid, ids, *args): invoices = self.read(cr, uid, ids, ['move_id']) for i in invoices: if i['move_id']: # delete the move this invoice was pointing to # Note that the corresponding move_lines and move_reconciles will be automatically deleted too self.pool.get('account.move').unlink(cr, uid, [i['move_id'][0]]) self.write(cr, uid, ids, {'state':'cancel', 'move_id':False}) self._log_event(cr, uid, ids,-1.0, 'Cancel Invoice') return True ################### def list_distinct_taxes(self, cr, uid, ids): invoices = self.browse(cr, uid, ids) taxes = {} for inv in invoices: for tax in inv.tax_line: if not tax['name'] in taxes: taxes[tax['name']] = {'name':tax['name']} return taxes.values() def _log_event(self, cr, uid, ids, factor=1.0, name='Open Invoice'): invs = self.read(cr, uid, ids, ['type','partner_id']) for inv in invs: part=inv['partner_id'] and inv['partner_id'][0] pc = pr = 0.0 cr.execute('select sum(quantity*price_unit) from account_invoice_line where invoice_id=%d', (inv['id'],)) total = cr.fetchone()[0] or 0 if inv['type'] in ('in_invoice','in_refund'): partnertype='supplier' eventtype = 'purchase' pc = total*factor else: partnertype = 'customer' eventtype = 'sale' pr = total*factor if self.pool.get('res.partner.event.type').check(cr, uid, 'invoice_open'): self.pool.get('res.partner.event').create(cr, uid, {'name':'Invoice: '+name, 'som':False, 'description':name+' '+str(inv['id']), 'document':name, 'partner_id':part, 'date':time.strftime('%Y-%m-%d %H:%M:%S'), 'canal_id':False, 'user_id':uid, 'partner_type':partnertype, 'probability':1.0, 'planned_revenue':pr, 'planned_cost':pc, 'type':eventtype}) return len(invs) def name_search(self, cr, user, name, args=[], operator='ilike', context={}): ids = [] if name: ids = self.search(cr, user, [('number','=',name)]+ args) if not ids: ids = self.search(cr, user, [('name',operator,name)]+ args) return self.name_get(cr, user, ids) def refund(self, cr, uid, ids): invoices = self.read(cr, uid, ids, ['name', 'type', 'number', 'reference', 'project_id', 'comment', 'date_due', 'partner_id', 'address_contact_id', 'address_invoice_id', 'partner_contact', 'partner_insite', 'partner_ref', 'payment_term', 'account_id','invoice_line','tax_line']) new_ids = [] for invoice in invoices: del invoice['id'] type_dict = { 'out_invoice': 'out_refund', # Customer Invoice 'in_invoice': 'in_refund', # Supplier Invoice 'out_refund': 'out_invoice', # Customer Refund 'in_refund': 'in_invoice', # Supplier Refund } def cleanup_lines(lines): for line in lines: del line['id'] del line['invoice_id'] line['account_id'] = line['account_id'] and line['account_id'][0] line['invoice_line_tax_id'] = [(6,0, line['invoice_line_tax_id']) ] return map(lambda x: (0,0,x), lines) invoice_lines = self.pool.get('account.invoice.line').read(cr, uid, invoice['invoice_line']) invoice_lines = cleanup_lines(invoice_lines) tax_lines = self.pool.get('account.invoice.tax').read(cr, uid, invoice['tax_line']) tax_lines = filter(lambda l: l['manual'], tax_lines) tax_lines = cleanup_lines(tax_lines) invoice.update({ 'type': type_dict[invoice['type']], 'date_invoice': time.strftime('%Y-%m-%d'), 'state': 'draft', 'number': False, 'invoice_line': invoice_lines, 'tax_line': tax_lines }) for field in ('address_contact_id','address_invoice_id','partner_id','project_id','account_id'): invoice[field] = invoice[field] and invoice[field][0] # create the new invoice new_ids.append(self.create(cr, uid, invoice)) return new_ids account_invoice() class account_invoice_line(osv.osv): def _amount_line(self, cr, uid, ids, prop, unknow_none,unknow_dict): res = {} for line in self.browse(cr, uid, ids): res[line.id] = line.price_unit * line.quantity return res _name = "account.invoice.line" _description = "Invoice line" _columns = { 'name': fields.char('Description', size=64, required=True), 'invoice_id': fields.many2one('account.invoice', 'Invoice Ref', ondelete='cascade'), 'uos_id': fields.many2one('account.uos', 'Unit of Sale', ondelete='set null'), 'account_id':fields.many2one('account.account', 'Source Account', required=True, domain=[('type','<>','view')]), 'price_unit': fields.float('Unit Price', required=True), 'price_subtotal': fields.function(_amount_line, method=True, string='Subtotal'), 'quantity': fields.float('Quantity', required=True), 'invoice_line_tax_id': fields.many2many('account.tax', 'account_invoice_line_tax', 'invoice_line_id', 'tax_id', 'Taxes'), } _defaults = { 'quantity': lambda *a: 1 } def move_line_get(self, cr, uid, invoice_id): type = self.pool.get('account.invoice').read(cr, uid, [invoice_id],['type'])[0]['type'] res = [] tax_grouped = {} cr.execute('SELECT * FROM account_invoice_line WHERE invoice_id=%d', (invoice_id,)) for line in cr.dictfetchall(): res.append({ 'type':'src', 'name':line['name'], 'price_unit':line['price_unit'], 'quantity':line['quantity'], 'price':round(line['quantity']*line['price_unit'],2), 'account_id':line['account_id'], }) cr.execute('SELECT tax_id FROM account_invoice_line_tax WHERE invoice_line_id=%d', (line['id'],)) for (tax_id,) in cr.fetchall(): c = self.pool.get('account.tax').compute(cr, uid, [tax_id], line['price_unit'],line['quantity'])[0] if type in ('out_invoice','in_refund'): c['account_id']=c['account_collected_id'] else: c['account_id']=c['account_paid_id'] t=(c['id'],c['account_id']) if not t in tax_grouped: tax_grouped[t]=c else: tax_grouped[t]['amount']+=c['amount'] # delete automatic tax lines for this invoice cr.execute("DELETE FROM account_invoice_tax WHERE NOT manual AND invoice_id=%d",(invoice_id,)) # (re)create them ait = self.pool.get('account.invoice.tax') for t in tax_grouped.values(): ait.create(cr, uid, {'invoice_id':invoice_id, 'name':t['name'], 'account_id':t['account_id'][0], 'amount':t['amount'], 'manual':False }) return res account_invoice_line() class account_invoice_tax(osv.osv): _name = "account.invoice.tax" _description = "Invoice Tax" _columns = { 'invoice_id': fields.many2one('account.invoice', 'Invoice Line', ondelete='cascade'), 'name': fields.char('Tax Description', size=64, required=True), 'account_id': fields.many2one('account.account', 'Taxe Account', required=True, domain=[('type','<>','view'),('type','<>','income')]), 'amount': fields.float('Amount', digits=(16,2)), 'manual': fields.boolean('Manual'), } _defaults = { 'manual': lambda *a: 1, } def move_line_get(self, cr, uid, invoice_id): res = [] cr.execute('SELECT * FROM account_invoice_tax WHERE invoice_id=%d', (invoice_id,)) for t in cr.dictfetchall(): res.append({'type':'tax','name':t['name'],'price_unit': t['amount'],'quantity': 1,'price': t['amount'] or 0.0,'account_id': t['account_id'], }) return res account_invoice_tax()