# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2001-2005 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Models steps/actions/transforms prototypes. A prototype is basically used to select inputs elements and to check outputs :version: $Revision:$ :author: Logilab :copyright: 2001-2005 LOGILAB S.A. (Paris, FRANCE) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com :type C_STEP: int :var C_STEP: constant for the step level :type C_PLAN: int :var C_PLAN: constant for the plan level :type C_PARENT: int :var C_PARENT: constant for the parent plan level :type C_MEM: int :var C_MEM: constant for the memory level :type FROM_CONTEXT: dict :var FROM_CONTEXT: dictionary mapping literal string values to integer constants for the from_context attribute :type TO_CONTEXT: dict :var TO_CONTEXT: dictionary mapping literal string values to integer constants for the to_context attribute :type REV_CONTEXT: dict :var REV_CONTEXT: dictionary mapping integer constants to literal string values """ __revision__ = "$Id: prototype.py,v 1.19 2003/03/18 17:50:29 syt Exp $" __docformat__ = "restructuredtext en" from copy import copy from sets import Set from narval.public import NO_NS, AL_NS, match_expression from narval.reader import BadAlElement from narval.element import NSAttributesElement, NSAttribute, DescriptionableMixin from narval.xml_handlers import BaseXMLHandler, DescriptionHandler from narval.serialutils import yn_value, yn_rev_value, context_value, \ context_rev_value, C_STEP, C_MEM from narval.tags import is_tagged class PrototypeException(Exception) : """raised on input / output error""" class PrototypeHandler(BaseXMLHandler): """handle the prototype description part for action / recipe Don't need to handle namespaces here since since sax events are propagated here only if they are in the correct NS """ def __init__(self, root, ns_context, locator): super(PrototypeHandler, self).__init__(root, ns_context, locator) self._stack = [root] self.descr_hdlr = None self.new = None def start_element(self, name, attrs): """SAX callback: start a new xml node :type name: tuple :param name: the tag name as a tuple (uri, name) :type attrs: dict :param attrs: the node's attribute values, indexed by attribute's name as a tuple (uri, name) """ elmt = local = name[1] if local == 'match': self.new = True elif local == 'description': self.descr_hdlr = DescriptionHandler( self._stack[-1], attrs.get((None, 'lang'), 'en')) elif local == 'input': elmt = InputEntry() elmt.init_attrs(attrs) self._stack[-1].prototype.add_in_entry(elmt) elif local == 'output': elmt = OutputEntry() elmt.init_attrs(attrs) self._stack[-1].prototype.add_out_entry(elmt) elif local == 'condition': elmt = Condition() elmt.init_attrs(attrs) self._stack[-1].add_condition(elmt) self._stack.append(elmt) def end_element(self, name): """SAX callback: close a xml node :type name: tuple :param name: the tag name as a tuple (uri, name) """ self._stack.pop() if name[1] == 'match': # now, top element is a PrototypeEntry object # => precompile match entry = self._stack[-1] last_match = entry.matches[-1] try: entry.cmatches.append(compile(last_match, 'prototype.py', 'eval')) except SyntaxError, exc: err_prefix = '%s: ' % (exc.msg) msg = '\n%s%s\n%s^' % (err_prefix, exc.text, ' ' * (len(err_prefix) + exc.offset)) raise BadAlElement(msg, skip = True) def characters(self, content): """SAX callback: get some (non empty) string :type content: unicode :param content: the non empty string to hold """ if self._stack[-1] == 'match': if self.new: self._stack[-2].matches.append(content) self.new = False else: self._stack[-2].matches[-1] += content elif self.descr_hdlr: self.descr_hdlr.characters(content) class PrototypeEntry(NSAttributesElement, DescriptionableMixin): """A input or output prototype entry. You can use the subscription notation to access to / set the attributes of the prototype (match, optional...) :ivar matches: list of expressions to be matched by the condition :ivar list: flag indicating wether the condition accepts more than 1 element """ list = NSAttribute(NO_NS, False, yn_value, yn_rev_value) def __init__(self): super(PrototypeEntry, self).__init__() self.owner = None self.matches = [] self.cmatches = [] def __repr__(self): return '\n'.join(self.matches) def clone(self): """make a proper copy of this prototype object and return it""" entry = copy(self) entry.clone_attrs(self._ns_attrs, True) entry.matches = self.matches[:] entry.cmatches = self.cmatches[:] return entry def children_as_xml(self, encoding='UTF-8'): """return a xml string for matchs :type encoding: str :param encoding: the encoding to use in the returned string :rtype: str :return: the entry as an XML snippet """ result = [] descr_xml = self.description_as_xml(encoding) descr_xml and result.append(descr_xml) for match in self.matches: result.append('%s' % match.encode(encoding)) return '\n'.join(result) class Condition(PrototypeEntry): """a transition's condition. Attributes : :ivar use: flag indicating wether an element triggering the condition may trigger it more than once (use==False) :ivar from_context: string controlling the element searching process :ivar to_context: string controlling the element searching process :type valid_elmts: list(AlElement) :ivar valid_elmts: list of elements matching the condition """ __xml_element__ = (AL_NS, 'condition') use = NSAttribute(NO_NS, False, yn_value, yn_rev_value) from_context = NSAttribute(NO_NS, C_STEP, context_value, context_rev_value) to_context = NSAttribute(NO_NS, C_MEM, context_value, context_rev_value) def __init__(self): super(Condition, self).__init__() self.matching_elmts = Set() self._satisfied_matches = Set() def match_elements(self, elements, context=None): """match elements agains the condition. Return true when each match of the condition has been satisfied. :type elements: iterable :param elements: elements to filter against the prototype :type context: dict or None :param elements: optional context used to evaluate match expressions :rtype: list :return: elements satisfying the condition if it's now fully satisfied """ context = context or {} use = self.use elements = [elmt for elmt in elements if not (elmt.outdated or (use and is_tagged(elmt, self.owner)))] for i, cmatch in enumerate(self.cmatches): if i in self._satisfied_matches: continue for element in elements: context['elmt'] = element if match_expression(cmatch, context): self._satisfied_matches.add(i) self.matching_elmts.add(element) # only return matching elements when each al:match has been satisfied if self.is_satisfied(): return self.matching_elmts return [] def is_satisfied(self): """return true if the condition is now satisfied (i.e. each match in the condition is satisfied) """ return len(self._satisfied_matches) == len(self.matches) class InputEntry(PrototypeEntry): """a action or step input prototype. Attributes : :ivar id: the input id, required and unique for the step / action :ivar optional: flag indicating wether the input is optional or required :ivar use: flag indicating wether an element triggering the input may trigger it more than once (use==False) :type from_context: str :ivar from_context: identifier of the level from which we should begin to look for matching elements :type to_context: str :ivar to_context: identifier of the level until which we should stpop looking for matching elements """ __xml_element__ = (AL_NS, 'input') id = NSAttribute(NO_NS, None, str, str) use = NSAttribute(NO_NS, False, yn_value, yn_rev_value) from_context = NSAttribute(NO_NS, C_STEP, context_value, context_rev_value) to_context = NSAttribute(NO_NS, C_MEM, context_value, context_rev_value) optional = NSAttribute(NO_NS, False, yn_value, yn_rev_value) def match_elements(self, elements, context=None): """return elements matching the prototype :type elements: iterable :param elements: elements to filter against the prototype :rtype: list :return: elements among the given ones satisfying the prototype """ match_elmts = [] use, cmatches = self.use, self.cmatches context = context or {} for element in elements: if not element.outdated and ( (not use) or (use and not is_tagged(element, self.owner))): context['elmt'] = element for cmatch in cmatches: if not match_expression(cmatch, context): break else: match_elmts.append(element) return match_elmts class OutputEntry(PrototypeEntry): """a action or step output prototype. Attributes : :ivar id: the output id, required and unique for the step / action :ivar optional: flag indicating wether the output is optional or required :ivar outdates: an optional string referencing and input id to outdate elements from this entry """ __xml_element__ = (AL_NS, 'output') id = NSAttribute(NO_NS, None, str, str) outdates = NSAttribute(NO_NS, '', str, str) optional = NSAttribute(NO_NS, False, yn_value, yn_rev_value) def match_elements(self, elements, context=None): """return elements matching the prototype :type elements: iterable :param elements: elements to filter against the prototype :rtype: list :return: elements among the given ones satisfying the prototype """ match_e = [] context = context or {} for element in elements: context['elmt'] = element for cmatch in self.cmatches: if not match_expression(cmatch, context): break else: match_e.append(element) return match_e class Prototype: """A step / action prototype is a list of input / output prototypes :type overload: `Prototype` or None :ivar overload: optional overloaded prototype (i.e. when we are a step prototype refining its target's prototype """ def __init__(self) : self._inputs, self._outputs = [], [] self.overload = None self.owner = None self.in_mixed, self.out_mixed = None, None def as_xml(self, encoding='UTF-8'): """return the input / output prototypes as xml :type encoding: str :param encoding: the encoding to use in the returned string :rtype: str :return: the prototype as an XML snippet """ result = [] for proto in self._inputs + self._outputs: result.append(proto.as_xml(encoding, namespaces_def=False)) return '\n'.join(result) def set_owner(self, owner) : """set owner on this prototype and on this prototype's inputs :type owner: `narval.plan.PlanStep` :param owner: the step owning this prototype """ self.owner = owner for input_proto in self._inputs: input_proto.owner = owner def prepare(self, overloaded): """prepare a step prototype :type overloaded: `Prototype` :param overloaded: the step's target prototype """ self.overload = overloaded def add_in_entry(self, entry): """add an input prototype entry :type entry: `InputEntry` :param id: the prototype entry object """ self._inputs.append(entry) def add_out_entry(self, entry): """add an output prototype entry :type entry: `OutputEntry` :param id: the prototype entry object """ self._outputs.append(entry) def input_prototype(self): """return the merged input prototypes :rtype: list :return: the list of merged input prototypes (i.e. step prototype + target prototype) """ if self.in_mixed: return self.in_mixed if self.overload: newlist = merge_input(self.overload._inputs, self._inputs) self.in_mixed = newlist for input_proto in newlist: input_proto.owner = self.owner else: newlist = [elmt for elmt in self._inputs if elmt.matches or not elmt.optional] return newlist def output_prototype(self): """return the merged output prototypes :rtype: list :return: the list of merged output prototypes (i.e. step prototype + target prototype)""" if self.out_mixed: return self.out_mixed if self.overload: newlist = merge_output(self.overload._outputs, self._outputs) self.out_mixed = newlist else: newlist = [elmt for elmt in self._outputs if elmt.matches or not elmt.optional] return newlist def check(self, target): """check this is a valid prototype :type target: `narval.action.ActionElement` :param target: the step's target :raise `PrototypeException`: if this is not a valid prototype """ target_in_proto = target.prototype._inputs target_out_proto = target.prototype._outputs for step_in_proto in self._inputs: self.check_entry(target_in_proto, step_in_proto) for step_out_proto in self._outputs: self.check_entry(target_out_proto, step_out_proto) errors = [] for entry in target_in_proto: if entry.id is None: errors.append('* input without id found in action %s.%s' % (target.group, target.name)) for entry in target_out_proto: if entry.id is None: errors.append('* output without id found in action %s.%s' % (target.group, target.name)) errors += _check_entries(merge_input(target_in_proto, self._inputs)) errors += _check_entries(merge_output(target_out_proto, self._outputs)) if errors: raise PrototypeException(' \n' + ' \n'.join(errors)) def check_entry(self, entries, entry) : """check is in :type entries: list :param entries: a list of elements :type entry: narval.public.ALElement :param entry: the element to check :raise `PrototypeException`: if the entry is not in the entries set """ e_id = entry.id if e_id is None: raise PrototypeException('entry without id found') for old in entries: if old.id == e_id: break else : raise PrototypeException('entry with id %s not found' % e_id) def _check_entries(entries): """check prototype entries :type entries: list :param entries: list of prototype entries to check :rtype: list :return: a list of strings describing errors (empty list means no error) """ errors, ids_dict = [], {} for entry in entries: if not entry.matches and not entry.optional: errors.append('* input prototype error: no match for %s' % entry.id) if ids_dict.has_key(entry.id): errors.append('* input prototype error: duplicate id %s' % entry.id) else: ids_dict[entry.id] = 1 return errors def merge_input(target_proto, step_proto) : """merge input step prototype with target prototype :type target_proto: list :param target_proto: list of target prototype input entries :type step_proto: list :param step_proto: list of step prototype input entries :rtype: list :return: the list of merged prototype entries """ newlist = [] for old in target_proto: for new in step_proto: if old.id == new.id : merged = old.clone() # the "list" attribute is not overidable # FIXME: check that "step.list == action.list" in check_recipe # FIXME: should not do this if the default value is used ! if old.optional and not new.optional: merged.optional = False if not old.use and new.use: merged.use = True for match_o, cmatch_o in zip(new.matches, new.cmatches): if match_o not in old.matches: merged.matches.append(match_o) merged.cmatches.append(cmatch_o) merged.from_context = new.from_context merged.to_context = new.to_context #if merged.matches or not merged.optional: newlist.append(merged) break else: #if old.matches or not old.optional: newlist.append(old) return newlist def merge_output(target_proto, step_proto) : """merge output step prototype with target prototype :type target_proto: list :param target_proto: list of target prototype output entries :type step_proto: list :param step_proto: list of step prototype output entries :rtype: list :return: the list of merged prototype entries """ newlist = [] for old in target_proto: for new in step_proto: if old.id == new.id : merged = old.clone() # FIXME: check that "step.list == action.list" in check_recipe # and step.optional == action.optional if not old.outdates and new.outdates: merged.outdates = new.outdates for match_o, cmatch_o in zip(new.matches, new.cmatches): if match_o not in old.matches: merged.matches.append(match_o) merged.cmatches.append(cmatch_o) if merged.matches or not merged.optional: newlist.append(merged) break else: #if old.matches or not old.optional: newlist.append(old) return newlist