# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany).
# Copyright (c) 2001-2005 LOGILAB S.A. (Paris, FRANCE).
#
# http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com
# http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""Models steps/actions/transforms prototypes.
A prototype is basically used to select inputs elements and to check outputs
:version: $Revision:$
:author: Logilab
:copyright:
2001-2005 LOGILAB S.A. (Paris, FRANCE)
2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany)
:contact:
http://www.logilab.fr/ -- mailto:contact@logilab.fr
http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com
:type C_STEP: int
:var C_STEP: constant for the step level
:type C_PLAN: int
:var C_PLAN: constant for the plan level
:type C_PARENT: int
:var C_PARENT: constant for the parent plan level
:type C_MEM: int
:var C_MEM: constant for the memory level
:type FROM_CONTEXT: dict
:var FROM_CONTEXT:
dictionary mapping literal string values to integer constants for the
from_context attribute
:type TO_CONTEXT: dict
:var TO_CONTEXT:
dictionary mapping literal string values to integer constants for the
to_context attribute
:type REV_CONTEXT: dict
:var REV_CONTEXT: dictionary mapping integer constants to literal string values
"""
__revision__ = "$Id: prototype.py,v 1.19 2003/03/18 17:50:29 syt Exp $"
__docformat__ = "restructuredtext en"
from copy import copy
from sets import Set
from narval.public import NO_NS, AL_NS, match_expression
from narval.reader import BadAlElement
from narval.element import NSAttributesElement, NSAttribute, DescriptionableMixin
from narval.xml_handlers import BaseXMLHandler, DescriptionHandler
from narval.serialutils import yn_value, yn_rev_value, context_value, \
context_rev_value, C_STEP, C_MEM
from narval.tags import is_tagged
class PrototypeException(Exception) :
"""raised on input / output error"""
class PrototypeHandler(BaseXMLHandler):
"""handle the prototype description part for action / recipe
Don't need to handle namespaces here since since sax events are propagated
here only if they are in the correct NS
"""
def __init__(self, root, ns_context, locator):
super(PrototypeHandler, self).__init__(root, ns_context, locator)
self._stack = [root]
self.descr_hdlr = None
self.new = None
def start_element(self, name, attrs):
"""SAX callback: start a new xml node
:type name: tuple
:param name: the tag name as a tuple (uri, name)
:type attrs: dict
:param attrs:
the node's attribute values, indexed by attribute's name as a tuple
(uri, name)
"""
elmt = local = name[1]
if local == 'match':
self.new = True
elif local == 'description':
self.descr_hdlr = DescriptionHandler(
self._stack[-1], attrs.get((None, 'lang'), 'en'))
elif local == 'input':
elmt = InputEntry()
elmt.init_attrs(attrs)
self._stack[-1].prototype.add_in_entry(elmt)
elif local == 'output':
elmt = OutputEntry()
elmt.init_attrs(attrs)
self._stack[-1].prototype.add_out_entry(elmt)
elif local == 'condition':
elmt = Condition()
elmt.init_attrs(attrs)
self._stack[-1].add_condition(elmt)
self._stack.append(elmt)
def end_element(self, name):
"""SAX callback: close a xml node
:type name: tuple
:param name: the tag name as a tuple (uri, name)
"""
self._stack.pop()
if name[1] == 'match':
# now, top element is a PrototypeEntry object
# => precompile match
entry = self._stack[-1]
last_match = entry.matches[-1]
try:
entry.cmatches.append(compile(last_match, 'prototype.py', 'eval'))
except SyntaxError, exc:
err_prefix = '%s: ' % (exc.msg)
msg = '\n%s%s\n%s^' % (err_prefix, exc.text, ' ' * (len(err_prefix) + exc.offset))
raise BadAlElement(msg, skip = True)
def characters(self, content):
"""SAX callback: get some (non empty) string
:type content: unicode
:param content: the non empty string to hold
"""
if self._stack[-1] == 'match':
if self.new:
self._stack[-2].matches.append(content)
self.new = False
else:
self._stack[-2].matches[-1] += content
elif self.descr_hdlr:
self.descr_hdlr.characters(content)
class PrototypeEntry(NSAttributesElement, DescriptionableMixin):
"""A input or output prototype entry. You can use the subscription notation
to access to / set the attributes of the prototype (match, optional...)
:ivar matches: list of expressions to be matched by the condition
:ivar list: flag indicating wether the condition accepts more than 1 element
"""
list = NSAttribute(NO_NS, False, yn_value, yn_rev_value)
def __init__(self):
super(PrototypeEntry, self).__init__()
self.owner = None
self.matches = []
self.cmatches = []
def __repr__(self):
return '\n'.join(self.matches)
def clone(self):
"""make a proper copy of this prototype object and return it"""
entry = copy(self)
entry.clone_attrs(self._ns_attrs, True)
entry.matches = self.matches[:]
entry.cmatches = self.cmatches[:]
return entry
def children_as_xml(self, encoding='UTF-8'):
"""return a xml string for matchs
:type encoding: str
:param encoding: the encoding to use in the returned string
:rtype: str
:return: the entry as an XML snippet
"""
result = []
descr_xml = self.description_as_xml(encoding)
descr_xml and result.append(descr_xml)
for match in self.matches:
result.append('%s' % match.encode(encoding))
return '\n'.join(result)
class Condition(PrototypeEntry):
"""a transition's condition. Attributes :
:ivar use:
flag indicating wether an element triggering the condition may trigger
it more than once (use==False)
:ivar from_context: string controlling the element searching process
:ivar to_context: string controlling the element searching process
:type valid_elmts: list(AlElement)
:ivar valid_elmts: list of elements matching the condition
"""
__xml_element__ = (AL_NS, 'condition')
use = NSAttribute(NO_NS, False, yn_value, yn_rev_value)
from_context = NSAttribute(NO_NS, C_STEP, context_value, context_rev_value)
to_context = NSAttribute(NO_NS, C_MEM, context_value, context_rev_value)
def __init__(self):
super(Condition, self).__init__()
self.matching_elmts = Set()
self._satisfied_matches = Set()
def match_elements(self, elements, context=None):
"""match elements agains the condition. Return true when each
match of the condition has been satisfied.
:type elements: iterable
:param elements: elements to filter against the prototype
:type context: dict or None
:param elements: optional context used to evaluate match expressions
:rtype: list
:return: elements satisfying the condition if it's now fully satisfied
"""
context = context or {}
use = self.use
elements = [elmt for elmt in elements
if not (elmt.outdated or (use and is_tagged(elmt, self.owner)))]
for i, cmatch in enumerate(self.cmatches):
if i in self._satisfied_matches:
continue
for element in elements:
context['elmt'] = element
if match_expression(cmatch, context):
self._satisfied_matches.add(i)
self.matching_elmts.add(element)
# only return matching elements when each al:match has been satisfied
if self.is_satisfied():
return self.matching_elmts
return []
def is_satisfied(self):
"""return true if the condition is now satisfied (i.e. each
match in the condition is satisfied)
"""
return len(self._satisfied_matches) == len(self.matches)
class InputEntry(PrototypeEntry):
"""a action or step input prototype. Attributes :
:ivar id: the input id, required and unique for the step / action
:ivar optional: flag indicating wether the input is optional or required
:ivar use:
flag indicating wether an element triggering the input may trigger it more
than once (use==False)
:type from_context: str
:ivar from_context:
identifier of the level from which we should begin to look for
matching elements
:type to_context: str
:ivar to_context:
identifier of the level until which we should stpop looking for
matching elements
"""
__xml_element__ = (AL_NS, 'input')
id = NSAttribute(NO_NS, None, str, str)
use = NSAttribute(NO_NS, False, yn_value, yn_rev_value)
from_context = NSAttribute(NO_NS, C_STEP, context_value, context_rev_value)
to_context = NSAttribute(NO_NS, C_MEM, context_value, context_rev_value)
optional = NSAttribute(NO_NS, False, yn_value, yn_rev_value)
def match_elements(self, elements, context=None):
"""return elements matching the prototype
:type elements: iterable
:param elements: elements to filter against the prototype
:rtype: list
:return: elements among the given ones satisfying the prototype
"""
match_elmts = []
use, cmatches = self.use, self.cmatches
context = context or {}
for element in elements:
if not element.outdated and (
(not use) or (use and not is_tagged(element, self.owner))):
context['elmt'] = element
for cmatch in cmatches:
if not match_expression(cmatch, context):
break
else:
match_elmts.append(element)
return match_elmts
class OutputEntry(PrototypeEntry):
"""a action or step output prototype. Attributes :
:ivar id: the output id, required and unique for the step / action
:ivar optional: flag indicating wether the output is optional or required
:ivar outdates:
an optional string referencing and input id to outdate elements from
this entry
"""
__xml_element__ = (AL_NS, 'output')
id = NSAttribute(NO_NS, None, str, str)
outdates = NSAttribute(NO_NS, '', str, str)
optional = NSAttribute(NO_NS, False, yn_value, yn_rev_value)
def match_elements(self, elements, context=None):
"""return elements matching the prototype
:type elements: iterable
:param elements: elements to filter against the prototype
:rtype: list
:return: elements among the given ones satisfying the prototype
"""
match_e = []
context = context or {}
for element in elements:
context['elmt'] = element
for cmatch in self.cmatches:
if not match_expression(cmatch, context):
break
else:
match_e.append(element)
return match_e
class Prototype:
"""A step / action prototype is a list of input / output prototypes
:type overload: `Prototype` or None
:ivar overload:
optional overloaded prototype (i.e. when we are a step prototype refining
its target's prototype
"""
def __init__(self) :
self._inputs, self._outputs = [], []
self.overload = None
self.owner = None
self.in_mixed, self.out_mixed = None, None
def as_xml(self, encoding='UTF-8'):
"""return the input / output prototypes as xml
:type encoding: str
:param encoding: the encoding to use in the returned string
:rtype: str
:return: the prototype as an XML snippet
"""
result = []
for proto in self._inputs + self._outputs:
result.append(proto.as_xml(encoding, namespaces_def=False))
return '\n'.join(result)
def set_owner(self, owner) :
"""set owner on this prototype and on this prototype's inputs
:type owner: `narval.plan.PlanStep`
:param owner: the step owning this prototype
"""
self.owner = owner
for input_proto in self._inputs:
input_proto.owner = owner
def prepare(self, overloaded):
"""prepare a step prototype
:type overloaded: `Prototype`
:param overloaded: the step's target prototype
"""
self.overload = overloaded
def add_in_entry(self, entry):
"""add an input prototype entry
:type entry: `InputEntry`
:param id: the prototype entry object
"""
self._inputs.append(entry)
def add_out_entry(self, entry):
"""add an output prototype entry
:type entry: `OutputEntry`
:param id: the prototype entry object
"""
self._outputs.append(entry)
def input_prototype(self):
"""return the merged input prototypes
:rtype: list
:return:
the list of merged input prototypes (i.e. step prototype + target
prototype)
"""
if self.in_mixed:
return self.in_mixed
if self.overload:
newlist = merge_input(self.overload._inputs, self._inputs)
self.in_mixed = newlist
for input_proto in newlist:
input_proto.owner = self.owner
else:
newlist = [elmt for elmt in self._inputs
if elmt.matches or not elmt.optional]
return newlist
def output_prototype(self):
"""return the merged output prototypes
:rtype: list
:return:
the list of merged output prototypes (i.e. step prototype + target
prototype)"""
if self.out_mixed:
return self.out_mixed
if self.overload:
newlist = merge_output(self.overload._outputs, self._outputs)
self.out_mixed = newlist
else:
newlist = [elmt for elmt in self._outputs
if elmt.matches or not elmt.optional]
return newlist
def check(self, target):
"""check this is a valid prototype
:type target:
`narval.action.ActionElement`
:param target: the step's target
:raise `PrototypeException`: if this is not a valid prototype
"""
target_in_proto = target.prototype._inputs
target_out_proto = target.prototype._outputs
for step_in_proto in self._inputs:
self.check_entry(target_in_proto, step_in_proto)
for step_out_proto in self._outputs:
self.check_entry(target_out_proto, step_out_proto)
errors = []
for entry in target_in_proto:
if entry.id is None:
errors.append('* input without id found in action %s.%s' %
(target.group, target.name))
for entry in target_out_proto:
if entry.id is None:
errors.append('* output without id found in action %s.%s' %
(target.group, target.name))
errors += _check_entries(merge_input(target_in_proto, self._inputs))
errors += _check_entries(merge_output(target_out_proto, self._outputs))
if errors:
raise PrototypeException(' \n' + ' \n'.join(errors))
def check_entry(self, entries, entry) :
"""check is in
:type entries: list
:param entries: a list of elements
:type entry: narval.public.ALElement
:param entry: the element to check
:raise `PrototypeException`: if the entry is not in the entries set
"""
e_id = entry.id
if e_id is None:
raise PrototypeException('entry without id found')
for old in entries:
if old.id == e_id:
break
else :
raise PrototypeException('entry with id %s not found' % e_id)
def _check_entries(entries):
"""check prototype entries
:type entries: list
:param entries: list of prototype entries to check
:rtype: list
:return: a list of strings describing errors (empty list means no error)
"""
errors, ids_dict = [], {}
for entry in entries:
if not entry.matches and not entry.optional:
errors.append('* input prototype error: no match for %s' %
entry.id)
if ids_dict.has_key(entry.id):
errors.append('* input prototype error: duplicate id %s' %
entry.id)
else:
ids_dict[entry.id] = 1
return errors
def merge_input(target_proto, step_proto) :
"""merge input step prototype with target prototype
:type target_proto: list
:param target_proto: list of target prototype input entries
:type step_proto: list
:param step_proto: list of step prototype input entries
:rtype: list
:return: the list of merged prototype entries
"""
newlist = []
for old in target_proto:
for new in step_proto:
if old.id == new.id :
merged = old.clone()
# the "list" attribute is not overidable
# FIXME: check that "step.list == action.list" in check_recipe
# FIXME: should not do this if the default value is used !
if old.optional and not new.optional:
merged.optional = False
if not old.use and new.use:
merged.use = True
for match_o, cmatch_o in zip(new.matches, new.cmatches):
if match_o not in old.matches:
merged.matches.append(match_o)
merged.cmatches.append(cmatch_o)
merged.from_context = new.from_context
merged.to_context = new.to_context
#if merged.matches or not merged.optional:
newlist.append(merged)
break
else:
#if old.matches or not old.optional:
newlist.append(old)
return newlist
def merge_output(target_proto, step_proto) :
"""merge output step prototype with target prototype
:type target_proto: list
:param target_proto: list of target prototype output entries
:type step_proto: list
:param step_proto: list of step prototype output entries
:rtype: list
:return: the list of merged prototype entries
"""
newlist = []
for old in target_proto:
for new in step_proto:
if old.id == new.id :
merged = old.clone()
# FIXME: check that "step.list == action.list" in check_recipe
# and step.optional == action.optional
if not old.outdates and new.outdates:
merged.outdates = new.outdates
for match_o, cmatch_o in zip(new.matches, new.cmatches):
if match_o not in old.matches:
merged.matches.append(match_o)
merged.cmatches.append(cmatch_o)
if merged.matches or not merged.optional:
newlist.append(merged)
break
else:
#if old.matches or not old.optional:
newlist.append(old)
return newlist