##############################################################################
#
# Copyright (c) 2001-2004 Zope Corporation and Contributors.
# Copyright (c) 2004 Christian Heimes and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""SMTP mail objects
$Id: SecureMailHost.py 39729 2007-03-25 13:21:14Z hannosch $
"""
from config import BAD_HEADERS
from copy import deepcopy
import email.Message
import email.Header
import email.MIMEText
import email
from email.Utils import getaddresses
from email.Utils import formataddr
import re
from types import StringType, TupleType, ListType
from AccessControl import ClassSecurityInfo
from AccessControl.Permissions import view_management_screens, \
use_mailhost_services
from Globals import Persistent, DTMLFile, InitializeClass
from Products.MailHost.MailHost import MailHostError, MailBase
from Products.SecureMailHost.mail import Mail
class SMTPError(Exception):
pass
EMAIL_RE = re.compile(r"^(\w&.%#$&'\*+-/=?^_`{}|~]+!)*[\w&.%#$&'\*+-/=?^_`{}|~]+@(([0-9a-z]([0-9a-z-]*[0-9a-z])?\.)+[a-z]{2,6}|([0-9]{1,3}\.){3}[0-9]{1,3})$", re.IGNORECASE)
# used to find double new line (in any variant)
EMAIL_CUTOFF_RE = re.compile(r".*[\n\r][\n\r]")
# We need to encode usernames in email addresses.
# This is especially important for Chinese and other languanges.
# Sample email addresses:
#
# aaa, "a,db", apn@zopechina.com, "ff s" , asdf
EMAIL_ADDRESSES_RE = re.compile(r'(".*?" *|[^,^"^>]+?)(<.*?>)')
class MailAddressTransformer:
""" a transformer for substitution """
def __init__(self, charset):
self.charset = charset
def __call__(self, matchobj):
name = matchobj.group(1)
address = matchobj.group(2)
return str(email.Header.Header(name, self.charset)) + address
def encodeHeaderAddress(address, charset):
""" address encoder """
return address and \
EMAIL_ADDRESSES_RE.sub(MailAddressTransformer(charset), address)
def formataddresses(fieldvalues):
"""Takes a list of (REALNAME, EMAIL) and returns one string
suitable for To or CC
"""
return ', '.join([formataddr(pair) for pair in fieldvalues])
manage_addMailHostForm = DTMLFile('www/addMailHost_form', globals())
def manage_addMailHost(self, id, title='', smtp_host='localhost',
smtp_port=25, smtp_userid=None,
smtp_pass=None, smtp_notls=None, REQUEST=None):
"""Add a MailHost
"""
ob = SecureMailHost(id, title, smtp_host, smtp_port,
smtp_userid, smtp_pass, smtp_notls)
self._setObject(id, ob)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(self.absolute_url()+'/manage_main')
add = manage_addMailHost
class SecureMailBase(MailBase):
"""A more secure mailhost with ESMTP features and header checking
"""
meta_type = 'Secure Mail Host'
manage=manage_main = DTMLFile('www/manageMailHost', globals())
manage_main._setName('manage_main')
index_html = None
security = ClassSecurityInfo()
def __init__(self, id='', title='', smtp_host='localhost',
smtp_port=25, smtp_userid='', smtp_pass='', smtp_notls=False):
"""Initialize a new MailHost instance
"""
self.id = id
self.setConfiguration(title, smtp_host, smtp_port,
smtp_userid, smtp_pass, smtp_notls)
security.declareProtected('Change configuration', 'manage_makeChanges')
def manage_makeChanges(self, title, smtp_host, smtp_port,
smtp_userid, smtp_pass, smtp_notls=None,
REQUEST=None):
"""Make the changes
"""
self.setConfiguration(title, smtp_host, smtp_port,
smtp_userid, smtp_pass, smtp_notls)
if REQUEST is not None:
msg = 'MailHost %s updated' % self.id
return self.manage_main(self, REQUEST, manage_tabs_message=msg)
security.declarePrivate('setConfiguration')
def setConfiguration(self, title, smtp_host, smtp_port,
smtp_userid, smtp_pass, smtp_notls):
"""Set configuration
"""
self.title = title
self.smtp_host = str(smtp_host)
self.smtp_port = int(smtp_port)
if smtp_userid:
self._smtp_userid = smtp_userid
self.smtp_userid = smtp_userid
else:
self._smtp_userid = None
self.smtp_userid = None
if smtp_pass:
self._smtp_pass = smtp_pass
self.smtp_pass = smtp_pass
else:
self._smtp_pass = None
self.smtp_pass = None
if smtp_notls is not None:
self.smtp_notls = smtp_notls
else:
self.smtp_notls = False
security.declareProtected(use_mailhost_services, 'sendTemplate')
def sendTemplate(trueself, self, messageTemplate,
statusTemplate=None, mto=None, mfrom=None,
encode=None, REQUEST=None):
"""Render a mail template, then send it...
"""
return MailBase.sendTemplate(trueself, self, messageTemplate,
statusTemplate=statusTemplate, mto=mto,
mfrom=mfrom, encode=encode,
REQUEST=REQUEST)
security.declareProtected(use_mailhost_services, 'send')
def send(self, message, mto=None, mfrom=None, subject=None,
encode=None):
"""Send email
"""
return MailBase.send(self, message, mto=mto, mfrom=mfrom,
subject=subject, encode=encode)
security.declareProtected(use_mailhost_services, 'secureSend')
def secureSend(self, message, mto, mfrom, subject='[No Subject]',
mcc=None, mbcc=None, subtype='plain', charset='us-ascii',
debug=False, **kwargs):
"""A more secure way to send a message
message:
The plain message text without any headers or an
email.Message.Message based instance
mto:
To: field (string or list)
mfrom:
From: field
subject:
Message subject (default: [No Subject])
mcc:
Cc: (carbon copy) field (string or list)
mbcc:
Bcc: (blind carbon copy) field (string or list)
subtype:
Content subtype of the email e.g. 'plain' for text/plain (ignored
if message is a email.Message.Message instance)
charset:
Charset used for the email, subject and email addresses
kwargs:
Additional headers
"""
mto = self.emailListToString(mto)
mcc = self.emailListToString(mcc)
mbcc = self.emailListToString(mbcc)
# validate email addresses
# XXX check Return-Path
for addr in mto, mcc, mbcc:
if addr:
result = self.validateEmailAddresses(addr)
if not result:
raise MailHostError, 'Invalid email address: %s' % addr
result = self.validateSingleEmailAddress(mfrom)
if not result:
raise MailHostError, 'Invalid email address: %s' % mfrom
# create message
if isinstance(message, email.Message.Message):
# got an email message. Make a deepcopy because we don't want to
# change the message
msg = deepcopy(message)
else:
msg = email.MIMEText.MIMEText(message, subtype, charset)
mfrom = encodeHeaderAddress(mfrom, charset)
mto = encodeHeaderAddress(mto, charset)
mcc = encodeHeaderAddress(mcc, charset)
mbcc = encodeHeaderAddress(mbcc, charset)
# set important headers
self.setHeaderOf(msg, skipEmpty=True, From=mfrom, To=mto,
Subject=str(email.Header.Header(subject, charset)),
Cc=mcc, Bcc=mbcc)
for bad in BAD_HEADERS:
if bad in kwargs:
raise MailHostError, 'Header %s is forbidden' % bad
self.setHeaderOf(msg, **kwargs)
# we have to pass *all* recipient email addresses to the
# send method because the smtp server doesn't add CC and BCC to
# the list of recipients
to = msg.get_all('to', [])
cc = msg.get_all('cc', [])
bcc = msg.get_all('bcc', [])
#resent_tos = msg.get_all('resent-to', [])
#resent_ccs = msg.get_all('resent-cc', [])
recipient_list = getaddresses(to + cc + bcc)
all_recipients = [formataddr(pair) for pair in recipient_list]
# finally send email
return self._send(mfrom, all_recipients, msg, debug=debug)
security.declarePrivate('setHeaderOf')
def setHeaderOf(self, msg, skipEmpty=False, **kwargs):
"""Set the headers of the email.Message based instance
All occurences of the key are deleted first!
"""
for key, val in kwargs.items():
del msg[key] # save - email.Message won't raise a KeyError
if skipEmpty and not val:
continue
msg[key] = val
return msg
def _send(self, mfrom, mto, messageText, debug=False):
"""Send the message
"""
if not isinstance(messageText, email.MIMEText.MIMEText):
message = email.message_from_string(messageText)
else:
message = messageText
smtp_notls = getattr(self, 'smtp_notls', False)
mail = Mail(mfrom, mto, message, smtp_host=self.smtp_host,
smtp_port=self.smtp_port, userid=self._smtp_userid,
password=self._smtp_pass, notls=smtp_notls
)
if debug:
return mail
else:
mail.send()
security.declarePublic('emailListToString')
def emailListToString(self, addr_list):
"""Converts a list of emails to rfc822 conform data
Input:
('email', 'email', ...)
or
(('name', 'email'), ('name', 'email'), ...)
or mixed
"""
# stage 1: test for type
if type(addr_list) not in (TupleType, ListType):
# a string is supposed to be a valid list of email addresses
# or None
return addr_list
# stage 2: get a list of address strings using email.formataddr
addresses = []
for addr in addr_list:
if type(addr) is StringType:
addresses.append(email.Utils.formataddr(('', addr)))
else:
if len(addr) != 2:
raise ValueError(
"Wrong format: ('name', 'email') is required")
addresses.append(email.Utils.formataddr(addr))
# stage 3: return the addresses as comma seperated string
return ', '.join(addresses)
######################################################################
# copied from CMFPlone 2.0.2 PloneTool.py
security.declarePublic('validateSingleNormalizedEmailAddress')
def validateSingleNormalizedEmailAddress(self, address):
"""Lower-level function to validate a single normalized email
address, see validateEmailAddress
"""
if type(address) is not StringType:
return False
sub = EMAIL_CUTOFF_RE.match(address);
if sub != None:
# Address contains two newlines (possible spammer relay attack)
return False
# sub is an empty string if the address is valid
sub = EMAIL_RE.sub('', address)
if sub == '':
return True
return False
security.declarePublic('validateSingleEmailAddress')
def validateSingleEmailAddress(self, address):
"""Validate a single email address, see also validateEmailAddresses
"""
if type(address) is not StringType:
return False
sub = EMAIL_CUTOFF_RE.match(address);
if sub != None:
# Address contains two newlines (spammer attack using
# "address\n\nSpam message")
return False
if len(getaddresses([address])) != 1:
# none or more than one address
return False
# Validate the address
for name,addr in getaddresses([address]):
if not self.validateSingleNormalizedEmailAddress(addr):
return False
return True
security.declarePublic('validateEmailAddresses')
def validateEmailAddresses(self, addresses):
"""Validate a list of possibly several email addresses, see
also validateSingleEmailAddress
"""
if type(addresses) is not StringType:
return False
sub = EMAIL_CUTOFF_RE.match(addresses);
if sub != None:
# Addresses contains two newlines (spammer attack using
# "To: list\n\nSpam message")
return False
# Validate each address
for name,addr in getaddresses([addresses]):
if not self.validateSingleNormalizedEmailAddress(addr):
return False
return True
# copied from CMFPlone 2.0.2 PloneTool.py
######################################################################
InitializeClass(SecureMailBase)
class SecureMailHost(Persistent, SecureMailBase):
"persistent version"