# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2000-2005 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """some elements used for the chat bot control :version: $Revision:$ :author: Logilab :copyright: 2000-2005 LOGILAB S.A. (Paris, FRANCE) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = "$Id:$" __docformat__ = "restructuredtext en" import md5 from base64 import encodestring import re from logilab.common.textutils import get_csv from narval.public import NO_NS, normalize_url from narval.element import ALElement, NSAttribute from narval.xml_handlers import OneLevelHandler from narval.serialutils import yn_value, yn_rev_value from narval.interfaces.base import IURL from narval.elements.base import FileElement def new_context(ref): """create a new context identifier as string :type ref: str :param ref: a string used to build the identifier (usually the the original jabber id) :rtype: str :return: a unique context identifier """ new_digest = repr(md5.new(str(ref)).digest()) return encodestring(new_digest) _DISCUSSIONS = {} def get_discussion(activator, key): key = (activator.host, key) try: return _DISCUSSIONS[key] except KeyError: element = DiscussionElement(context=new_context(key), user=activator.user) _DISCUSSIONS[key] = element return element class AccessRight: def __init__(self, attrs): self.cmd_name = attrs[(NO_NS, 'command')] self.usable_in_chat = yn_value(attrs.get((NO_NS, 'usable-in-group-chat'), 'yes')) self._access_list = '' self._compiled_access_list = None def characters(self, value): self._access_list += value def as_xml(self, encoding='UTF-8'): return '\ %s' % (self.cmd_name, yn_rev_value(self.usable_in_chat), self._access_list) def access_list(self): """return a list of identifier authorized for this access""" if self._compiled_access_list is None: self._compiled_access_list = [w.lower() for w in get_csv(self._access_list)] return self._compiled_access_list def __eq__(self, other): return (other.cmd_name == self.cmd_name and other.access_list() == self.access_list() and other.usable_in_chat == self.usable_in_chat) class BotConfigurationElement(ALElement): """an element to configure the (jabber for now) bot :type verbose: bool :attr verbose: be verbose about internal action :type mode: str :attr mode: the current chat bot mode (currently "default" or "metalog") :type min_threshold: float :attr min_threshold: the confidence level above which we are not sure the sentence belong to the NO category (i.e. don't log). 0 < min_threshold < max_threshold :type max_threshold: float :attr max_threshold: the confidence level under which we are not sure the sentence belong to the YES category (i.e. log). min_threshold < max_threshold < 1 """ __xml_element__ = (NO_NS, 'bot-configuration') __child_handler__ = OneLevelHandler subelements_classes = {(NO_NS, 'access-right'): AccessRight} subelements_attribute = 'acls' verbose = NSAttribute(NO_NS, False, yn_value, yn_rev_value) min_treshold = NSAttribute(NO_NS, None, float, str) max_treshold = NSAttribute(NO_NS, None, float, str) acls = [] _acls_dict = None def children_as_xml(self, encoding='UTF-8'): """return the XML representation of children in this element. :type encoding: str :param encoding: the encoding to use in the returned string :rtype: str :return: XML string representing the element """ result = [' %s' % access_right.as_xml(encoding) for access_right in self.acls] return '\n'.join(result) def acls_dict(self): """get a dictionary of access control list indexed by command name""" if self._acls_dict is None: self._acls_dict = {} for access_right in self.acls: self._acls_dict[access_right.cmd_name] = access_right return self._acls_dict def has_access_right(self, master_id, user, cmdname, groupchat=False): """check if the user has access to the given command :type master_id: str :param master_id: the master identifier :type user: str :param user: the user id :type cmdname: str :param cmdname: the name of the desired command :type groupchat: bool :param groupchat: flag indicating if the access is required from a groupchat or not special keywords for access right definition: * all (everybody) * myuser (user of the agent, defined on the bot-configuration element) if no access right is defined, default to "myuser" if it's defined, else to "all". """ access_rights = [] keys = self.acls_dict().keys() for each_key in keys: if re.match(each_key, cmdname): access_rights.append(self.acls_dict()[each_key]) if not access_rights and (not master_id or user == master_id): return True for access_right in access_rights: if groupchat and not access_right.usable_in_chat: continue access = access_right.access_list() if not access and (not master_id or user == master_id): return True if 'myuser' in access and user == master_id: return True if 'all' in access or user in access: return True return False def access_rights(self, master_id, cmdname): """return the list users having access to the given command :type master_id: str :param master_id: the master identifier :type cmdname: str :param cmdname: the name of the desired command """ # FIXME: return groupchat info try: access = self.acls_dict()[cmdname].access_list() if access: return access except KeyError: pass # no access rights explicitly defined for this command, check if # a master is defined, in which case he's the only one with # access right if master_id: return ['myuser'] # no master defined, everyone has access return ['all'] class DiscussionElement(ALElement): """ :type context: str :ivar context: the thread context identifier, according to the connection and to the sender (user or conference room.) :type user: str :ivar user: the jabber user's name :type maxsize: int :ivar maxsize: messages queue size (the number of messages to remember from the discussion) """ __xml_element__ = (NO_NS, 'discussion') context = NSAttribute(NO_NS, None, str, str) user = NSAttribute(NO_NS, None, str, str) maxsize = NSAttribute(NO_NS, 10, int, str) mode = NSAttribute(NO_NS, 'default', str, str) def __init__(self, **kw): ALElement.__init__(self, **kw) self._queue = [] def __len__(self): return len(self._queue) def __getitem__(self, key): """ >>> el = DiscussionElement() >>> el.append('hello') >>> el.append('how are you ?') >>> el.append('fine') >>> el.append('thank you') >>> el[0] 'thank you' >>> el[-1] 'hello' >>> el[-2] 'how are you ?' >>> el[2] 'how are you ?' """ assert type(key) is type(1) return self._queue[-key - 1] def append(self, message): while len(self._queue) >= self.maxsize: self._queue.pop(0) self._queue.append(message) def get_message(self, index): """get the message at the given index in the queue, IGNORING MESSAGES PRODUCED BY THE AGENT """ i = 0 for msg in self: if msg.get_from_user() == self.user: continue if i == index: return msg i += 1 raise IndexError(index) DEFAULT_DISCUSSION_LOG_FILE_URL = 'file:$NARVAL_HOME/data/bot_log' def get_log_file_base(self, obj=None): """return the path to the discussion log file and it's encoding :param obj: object adaptable to IURL :rtype: tuple(str, str) """ if obj is None: path = normalize_url(self.DEFAULT_DISCUSSION_LOG_FILE_URL)[0] encoding = 'UTF-8' else: # FIXME: check protocol == file: ? iurl = IURL(obj) path = iurl.address encoding = iurl.encoding or 'UTF-8' return path, encoding def get_log_file(self, obj=None): """return the IFile element locating the discussion log file :type fromid: str :param fromid: the sentence'sender jabber user id :param obj: object adaptable to IURL used to get the discussion log file :rtype: `IFile` implementation """ logfile = FileElement() path, logfile.encoding = self.get_log_file_base(obj) logfile.address = '%s.%s' % (path, self.context) return logfile