#* Foundation, Inc., *
#* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *
#***************************************************************************
-# $Id: mailman-rpc.py,v 1.10 2004-09-09 13:08:50 x2000habouzit Exp $
+# $Id: mailman-rpc.py,v 1.11 2004-09-09 22:57:38 x2000habouzit Exp $
#***************************************************************************
import base64, MySQLdb
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
-from Mailman import mm_cfg
+import paths
from Mailman import MailList
from Mailman import Utils
+from Mailman import Message
from Mailman import Errors
-from Mailman.i18n import _
-
-class UserDesc: pass
+from Mailman import mm_cfg
+from Mailman import i18n
+from Mailman.UserDesc import UserDesc
class AuthFailed(Exception): pass
def _dispatch(self,method,params):
# TODO: subclass in SimpleXMLRPCDispatcher and not here.
new_params = list(params)
- new_params.insert(0,self.userdesc)
+ new_params.insert(0,self.data)
return self.server._dispatch(method,new_params)
def do_POST(self):
- headers = self.headers
try:
- if not headers.has_key("authorization"):
- raise AuthFailed
- auth = headers["authorization"]
- _, auth = auth.split()
- uid, md5 = base64.decodestring(auth).strip().split(':')
- self.userdesc = self.getUserDesc(uid,md5)
- if self.userdesc is None:
+ _, auth = self.headers["authorization"].split()
+ uid, md5 = base64.decodestring(auth).strip().split(':')
+ self.data = self.getUser(uid,md5)
+ if self.data is None:
raise AuthFailed
# Call super.do_POST() to do the actual work
SimpleXMLRPCRequestHandler.do_POST(self)
self.send_response(401)
self.end_headers()
- def getUserDesc(self, uid, md5):
+ def getUser(self, uid, md5):
mysql.execute ("""SELECT u.prenom,u.nom,a.alias,u.perms
FROM auth_user_md5 AS u
INNER JOIN aliases AS a ON a.id=u.user_id
LIMIT 1""" %( uid, md5 ) )
if int(mysql.rowcount) is 1:
user = mysql.fetchone()
- userdesc = UserDesc()
- userdesc.fullname = user[0]+' '+user[1]
- userdesc.address = user[2]+'@polytechnique.org'
- userdesc.digest = 0
- userdesc.perms = user[3]
- return userdesc
+ userdesc = UserDesc(user[2]+'@polytechnique.org', user[0]+' '+user[1], None, 0)
+ return (userdesc,user[3])
else:
return None
db.ping()
return db.cursor()
-def is_admin_on(userdesc,mlist):
- return ( userdesc.perms == 'admin' ) or ( userdesc.address in mlist.owner )
+def is_admin_on(userdesc,perms,mlist):
+ return ( perms == 'admin' ) or ( userdesc.address in mlist.owner )
#-------------------------------------------------------------------------------
# users procedures
#
-def get_lists(userdesc):
+def get_lists((userdesc,perms)):
names = Utils.list_names()
names.sort()
result = []
for name in names:
try:
mlist = MailList.MailList(name, lock=0)
- except Errors.MMListError, e:
+ except:
continue
is_member = userdesc.address in mlist.getRegularMemberKeys()
is_owner = userdesc.address in mlist.owner
is_admin = mm_cfg.ADMIN_ML_OWNER in mlist.owner
- if ( mlist.advertised ) or ( userdesc.perms == 'admin' and is_admin ) or is_member or is_owner:
+ if ( mlist.advertised ) or ( perms == 'admin' and is_admin ) or is_member or is_owner:
result.append( (name,1-mlist.advertised+is_admin,is_member,is_owner) )
return result
-def get_members(userdesc,listname):
+def get_members((userdesc,perms),listname):
try:
- mlist = MailList.MailList(listname, lock=False)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname, lock=0)
+ except:
return None
members = mlist.getRegularMemberKeys()
- if ( mlist.advertised ) or ( is_admin_on(userdesc, mlist) ) or ( userdesc.address in members ):
+ if ( mlist.advertised ) or ( is_admin_on(userdesc, perms, mlist) ) or ( userdesc.address in members ):
return (members,mlist.owner)
-def subscribe(userdesc,listname):
+def subscribe((userdesc,perms),listname):
try:
- mlist = MailList.MailList(listname, lock=True)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname)
+ except:
return 0
try:
- if ( mlist.subscribe_policy in (0,1) ) or is_admin_on(userdesc,mlist):
+ if ( mlist.subscribe_policy in (0,1) ) or is_admin_on(userdesc, perms, mlist):
result = 2
- mlist.ApprovedAddMember(userdesc)
- mlist.Save()
+ mlist.ApprovedAddMember(userdesc, None, 0)
else:
result = 1
- mlist.AddMember(userdesc,'xml-rpc iface')
+ try:
+ mlist.AddMember(userdesc)
+ except Errors.MMNeedApproval:
+ pass
+ mlist.Save()
except:
result = 0
mlist.Unlock()
return result
-def unsubscribe(userdesc,listname):
- # here : no rights to verify, because if we can delete us ...
- # it's that we are in there, else we delete nobody, so no harm
+def unsubscribe((userdesc,perms),listname):
try:
- mlist = MailList.MailList(listname, lock=True)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname)
+ except:
return 0
try:
- mlist.ApprovedDeleteMember(userdesc.address, 'xml-rpc iface', False, False)
+ mlist.ApprovedDeleteMember(userdesc.address, None, False, False)
mlist.Save()
+ mlist.Unlock()
+ return 1
except:
mlist.Unlock()
return 0
- mlist.Unlock()
- return 1
#-------------------------------------------------------------------------------
# owners procedures
#
-def mass_subscribe(userdesc,listname,users):
+def mass_subscribe((userdesc,perms),listname,users):
try:
- mlist = MailList.MailList(listname, lock=True)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname)
+ except:
return None
try:
- if not is_admin_on(userdesc,mlist):
+ if not is_admin_on(userdesc, perms, mlist):
return None
added = []
LIMIT 1""" %( user ) )
if int(mysql.rowcount) is 1:
row = mysql.fetchone()
- userd = UserDesc()
- userd.fullname = row[0]
- userd.address = user+'@polytechnique.org'
- userd.digest = 0
+ userd = UserDesc(user+'@polytechnique.org', row[0], None, 0)
mlist.ApprovedAddMember(userd)
added.append( (userd.fullname, userd.address) )
- except:
- pass
- mlist.Save()
- mlist.Unlock()
- return added
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ return added
-def mass_unsubscribe(userdesc,listname,users):
+def mass_unsubscribe((userdesc,prems),listname,users):
try:
- mlist = MailList.MailList(listname, lock=True)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname)
+ except:
return None
try:
- if not is_admin_on(userdesc,mlist):
+ if not is_admin_on(userdesc, perms, mlist):
return None
deleted = []
for user in users:
- mlist.ApprovedDeleteMember(user+'@polytechnique.org', 'xml-rpc iface', False, False)
+ mlist.ApprovedDeleteMember(user+'@polytechnique.org', None, False, False)
deleted.append( user )
- except:
- pass
- mlist.Save()
- mlist.Unlock()
- return deleted
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ return deleted
-def add_owner(userdesc,listname,user):
+def add_owner((userdesc,perms),listname,user):
try:
- mlist = MailList.MailList(listname, lock=True)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname)
+ except:
return None
try:
- if not is_admin_on(userdesc,mlist):
+ if not is_admin_on(userdesc, perms, mlist):
return None
addr = user+'@polytechnique.org'
if addr not in mlist.owner:
mlist.owner.append(addr)
- except:
- pass
- mlist.Save()
- mlist.Unlock()
- return True
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ return True
-def del_owner(userdesc,listname,user):
+def del_owner((userdesc,perms),listname,user):
try:
- mlist = MailList.MailList(listname, lock=True)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname)
+ except:
return None
try:
- if not is_admin_on(userdesc,mlist):
+ if not is_admin_on(userdesc, perms, mlist):
return None
mlist.owner.remove(user+'@polytechnique.org')
- except:
- pass
- mlist.Save()
- mlist.Unlock()
- return True
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ return True
-def get_welcome(userdesc,listname):
+def get_welcome((userdesc,perms),listname):
try:
mlist = MailList.MailList(listname, lock=0)
- except Errors.MMListError, e:
+ except:
return None
- if not is_admin_on(userdesc,mlist):
+ if not is_admin_on(userdesc, perms, mlist):
return None
return mlist.info
-def set_welcome(userdesc,listname,info):
+def set_welcome((userdesc,perms),listname,info):
try:
- mlist = MailList.MailList(listname, lock=True)
- except Errors.MMListError, e:
+ mlist = MailList.MailList(listname)
+ except:
return None
try:
- if not is_admin_on(userdesc,mlist):
+ if not is_admin_on(userdesc, perms, mlist):
return None
mlist.info = info
- except:
- pass
- mlist.Save()
- mlist.Unlock()
- return True
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ return True
################################################################################
#
mysql = connectDB()
server = SimpleXMLRPCServer(("localhost", 4949), BasicAuthXMLRPCRequestHandler)
+
server.register_function(get_lists)
server.register_function(get_members)
server.register_function(subscribe)
server.register_function(del_owner)
server.register_function(get_welcome)
server.register_function(set_welcome)
-#server.register_introspection_functions()
+
server.serve_forever()
# vim:set et: