##############################################################################
#
# COREBlogFolder.py
# Class for COREBlog2 Folder
#
# Copyright (c) 2005 Atsushi Shibata(shibata@webcore.co.jp).
# All Rights Reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted, provided that
# the above copyright notice appear in all copies and that both that copyright
# notice and this permission notice appear in supporting documentation, and that
# the name of Atsushi Shibata not be used in advertising or publicity pertaining
# to distribution of the software without specific, written prior permission.
#
# ATSUSHI SHIBAT DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO
# EVENT SHALL SHIBAT ATSUSHI BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
# USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
#
#$Id: COREBlogTool.py 156 2006-01-03 20:49:37Z ats $
#
##############################################################################
from Products.CMFCore.utils import UniqueObject
from Products.CMFCore.utils import getToolByName
from Products.CMFPlone.utils import log
from AccessControl import ClassSecurityInfo
from OFS.SimpleItem import SimpleItem
from Globals import InitializeClass
from Products.COREBlog2.configuration import zconf
#Python modules
import urllib
import string,urllib,httplib,urlparse,re
from xmlrpclib import Server,Transport
import socket
timeout = zconf.coreblog2.ping_timeout_default
def safe_uencode(s):
if isinstance(s,unicode):
s = s.encode('utf-8')
else:
s = unicode(s,'utf-8').encode('utf-8')
return s
class COREBlog2Tool(UniqueObject, SimpleItem):
"""Utility methods for COREBlog2"""
id = 'coreblog2_tool'
meta_type= 'COREBlog2 Tool'
plone_tool = True
security = ClassSecurityInfo()
security.declarePublic('sendPing')
def sendPing(self,
serverurl,
blogtitle,
url,
version_str = "COREBlog2",
char_code = "utf-8",
fromcode=""):
"""
Send PING to PING server.
"""
#blogtitle = convert_charcode(blogtitle,char_code,fromcode)
# Sotre timeout
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
svr = Server(serverurl)
Transport.user_agent = version_str
try:
resp = svr.weblogUpdates.ping(blogtitle,url)
except Exception, e:
resp = {'error':str(e)}
socket.setdefaulttimeout(oldtimeout)
return resp
security.declarePublic('sendTrackback')
def sendTrackback(self,
trackback_url,
title="",
src_url="",
blog_name="",
excerpt="",
agent = "COREBlog2",
charset="utf-8",
param_charset="utf-8"):
"""
Try to send trackback request to trackback_url
"""
try:
#To make dictionary for POST
params = {"title": safe_uencode(title),
"url": safe_uencode(src_url),
"blog_name":safe_uencode(blog_name),
}
headers = ({"Content-type": "application/x-www-form-urlencoded",
"User-Agent": agent})
if param_charset:
params["charset"] = param_charset
elif charset:
headers["Content-type"] = headers["Content-type"] + \
"; charset=%s"
# Sotre timeout
oldtimeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
if len(excerpt) > 0:
params["excerpt"] = safe_uencode(excerpt)
tb_url_t = urlparse.urlparse(trackback_url)
enc_params = urllib.urlencode(params)
#check if trackback url contains parameter section(for PyDs!)
ut = urlparse.urlparse(trackback_url)
if len(ut) >= 4 and ut[4]:
#add params to parameter section
enc_params = ut[4] + '&' + enc_params
host = tb_url_t[1]
path = tb_url_t[2]
con = httplib.HTTPConnection(host)
con.request("POST", path, enc_params, headers)
r = con.getresponse()
http_response = r.status
http_reason = r.reason
resp = r.read()
socket.setdefaulttimeout(oldtimeout)
err_code_pat = re.compile("(.*?)",re.DOTALL)
message_pat = re.compile("(.*?)",re.DOTALL)
error_code = 0
message = ""
err_m = err_code_pat.search(resp)
if err_m:
try:
error_code = int(err_m.group(1))
except:
pass
else:
error_code = 1
mes_m = message_pat.search(resp)
if mes_m:
try:
message = mes_m.group(1)
except:
pass
except Exception,e:
error_code = -1
message = str(e)
log( 'COREBlog2Tool/sendTrackback: '
'Some exception occured, %s' % e )
return error_code,message
security.declarePublic('discoverTrackback')
def discoverTrackback(self,url):
"""
Parse trackback:ping url from given url
"""
o = urllib.urlopen(url)
src = o.read()
#Regex pattern
rdfpat = re.compile("",re.DOTALL)
#Pattern for Trackback PING URL
tppat = re.compile("""trackback:ping="([^"]+)""",re.DOTALL)
trackback_ping_url = ""
#Finc Trackback PING URL
for item in rdfpat.findall(src):
m = tppat.search(item)
if m:
trackback_ping_url = m.group(1)
break
return trackback_ping_url
security.declarePublic('guess_encode')
def guess_encode(self,s):
"""
Guess encodings.
possible_encodings should be set in conf file.
"""
possible_encodings = ['utf-8','euc_jp','shift-jis','us-ascii']
guessed_enc = 'us-ascii'
for enc in possible_encodings:
try:
unicode(s,enc)
guessed_enc = enc
break;
except:
pass
return guessed_enc
security.declarePublic('convert_charcode')
def convert_charcode(self,s,fromcodestr=""):
"""
convert charcode (for Japanese)
"""
if type(s) == type(u''):
#unicode -> str
s = s.encode('utf-8')
fromcodestr = code_utf8
retstr = s
try:
import pykf
pykf_fromcodemap = { \
pykf.EUC : "euc-jp",
pykf.SJIS: "shift_jis",
pykf.UTF8: "utf-8",
pykf.JIS : "iso-2022-jp" }
pkf_fromcode = 0
if not fromcodestr:
#fromcode is unknown... so do 'guess'
fromcodestr = self.guess_encode(s)
#Do conversion...
retstr = unicode(s, fromcodestr, "ignore").encode('utf8')
except:
pass
return retstr
security.declarePublic('send_mail')
def send_mail(self, body,to_addr,from_addr,subject):
#
# Method to bypass Zope's security.
# For case of comment/trackback post by anonymous user
#
try:
self.MailHost.send(body,to_addr, from_addr, subject)
except Exception,e:
log( 'COREBlog2/tbping: '
'Some exception occured, %s' % e )
InitializeClass(COREBlog2Tool)