#!/usr/bin/env python
#***************************************************************************
-#* Copyright (C) 2004-2008 polytechnique.org *
+#* Copyright (C) 2003-2010 Polytechnique.org *
#* http://opensource.polytechnique.org/ *
#* *
#* This program is free software; you can redistribute it and/or modify *
return config.get(sec, val)[1:-1]
except ConfigParser.NoOptionError, e:
if default is None:
- print e
+ sys.stderr.write('%s\n' % str(e))
sys.exit(1)
else:
return default
MYSQL_USER = get_config('Core', 'dbuser')
MYSQL_PASS = get_config('Core', 'dbpwd')
+MYSQL_DB = get_config('Core', 'dbdb')
PLATAL_DOMAIN = get_config('Mail', 'domain')
PLATAL_DOMAIN2 = get_config('Mail', 'domain2', '')
+sys.stderr.write('PLATAL_DOMAIN = %s\n' % PLATAL_DOMAIN )
VHOST_SEP = get_config('Lists', 'vhost_sep', '_')
ON_CREATE_CMD = get_config('Lists', 'on_create', '')
except:
raise Exception('method "%s" is not supported' % method)
+ def is_rpc_path_valid(self):
+ return True
def _dispatch(self, method, params):
- new_params = list(params)
return list_call_dispatcher(self._get_function(method), self.data[0], self.data[1], self.data[2], *params)
def do_POST(self):
self.end_headers()
def getUser(self, uid, md5, vhost):
- res = mysql_fetchone ("""SELECT CONCAT(u.prenom, ' ', u.nom), a.alias, u.perms
- FROM auth_user_md5 AS u
- INNER JOIN aliases AS a ON ( a.id=u.user_id AND a.type='a_vie' )
- WHERE u.user_id = '%s' AND u.password = '%s' AND u.perms IN ('admin', 'user')
- LIMIT 1""" %( uid, md5 ) )
+ res = mysql_fetchone ("""SELECT a.full_name, IF(aa.alias IS NULL, a.email, CONCAT(aa.alias, '@%s')),
+ IF (a.is_admin, 'admin',
+ IF(FIND_IN_SET('lists', at.perms) OR FIND_IN_SET('lists', a.user_perms), 'lists', NULL))
+ FROM accounts AS a
+ INNER JOIN account_types AS at ON (at.type = a.type)
+ LEFT JOIN aliases AS aa ON (a.uid = aa.uid AND aa.type = 'a_vie')
+ WHERE a.uid = '%s' AND a.password = '%s' AND a.state = 'active'
+ LIMIT 1""" \
+ % (PLATAL_DOMAIN, uid, md5))
if res:
name, forlife, perms = res
if vhost != PLATAL_DOMAIN:
- res = mysql_fetchone ("""SELECT uid
- FROM groupex.membres AS m
- INNER JOIN groupex.asso AS a ON (m.asso_id = a.id)
- WHERE perms='admin' AND uid='%s' AND mail_domain='%s'""" %( uid , vhost ) )
- if res: perms= 'admin'
- userdesc = UserDesc(forlife+'@'+PLATAL_DOMAIN, name, None, 0)
+ res = mysql_fetchone ("""SELECT m.uid, IF(m.perms = 'admin', 'admin', 'lists')
+ FROM group_members AS m
+ INNER JOIN groups AS g ON (m.asso_id = g.id)
+ WHERE uid = '%s' AND mail_domain = '%s'""" \
+ % (uid, vhost))
+ if res:
+ _, perms = res
+ userdesc = UserDesc(forlife, name, None, 0)
return (userdesc, perms, vhost)
else:
return None
def connectDB():
db = MySQLdb.connect(
- db='x4dat',
+ db=MYSQL_DB,
user=MYSQL_USER,
passwd=MYSQL_PASS,
unix_socket='/var/run/mysqld/mysqld.sock')
mbox = email
fqdn = PLATAL_DOMAIN
if ( fqdn == PLATAL_DOMAIN ) or ( fqdn == PLATAL_DOMAIN2 ):
- res = mysql_fetchone("""SELECT CONCAT(f.alias, '@%s'), CONCAT(u.prenom, ' ', u.nom)
- FROM auth_user_md5 AS u
- INNER JOIN aliases AS f ON (f.id=u.user_id AND f.type='a_vie')
- INNER JOIN aliases AS a ON (a.id=u.user_id AND a.alias='%s' AND a.type!='homonyme')
- WHERE u.perms IN ('admin', 'user')
- LIMIT 1""" %( PLATAL_DOMAIN, mbox ) )
+ res = mysql_fetchone("""SELECT CONCAT(f.alias, '@%s'), a.full_name
+ FROM accounts AS a
+ INNER JOIN aliases AS f ON (f.uid = a.uid AND f.type = 'a_vie')
+ INNER JOIN aliases AS aa ON (aa.uid = a.uid AND aa.alias = '%s'
+ AND a.type != 'homonyme')
+ WHERE a.state = 'active'
+ LIMIT 1""" \
+ % (PLATAL_DOMAIN, mbox))
if res:
return res
else:
else:
return method(userdesc, perms, vhost, *arg)
except Exception, e:
+ sys.stderr.write('Exception in dispatcher %s\n' % str(e))
raise e
return 0
mlist.Save()
mlist.Unlock()
return ret
- except:
+ except Exception, e:
+ sys.stderr.write('Exception in locked call %s: %s\n' % (method.__name__, str(e)))
mlist.Unlock()
return 0
# TODO: use finally when switching to python 2.5
# helpers on lists
#
-def is_subscription_pending(userdesc, perms, mlist, edit):
+def is_subscription_pending(userdesc, perms, mlist):
for id in mlist.GetSubscriptionIds():
if userdesc.address == mlist.GetRecord(id)[1]:
return True
members = mlist.getRegularMemberKeys()
is_member = userdesc.address in members
is_owner = userdesc.address in mlist.owner
- if mlist.advertised or is_member or is_owner or (not front_page and perms == 'admin'):
+ if (mlist.advertised and perms in ('lists', 'admin')) or is_member or is_owner or (not front_page and perms == 'admin'):
is_pending = False
if not is_member and (mlist.subscribe_policy > 1):
- is_pending = list_call_locked(userdesc, perms, mlist, is_subscription_pending, False)
+ is_pending = list_call_locked(is_subscription_pending, userdesc, perms, mlist, False)
if is_pending is 0:
return 0
details = get_list_info(userdesc, perms, mlist)[0]
return (details, options)
-def set_options(userdesc, perms, mlist, vals):
- """ Set the options of a list.
- @mlist
- @edit
- @admin
- """
+def set_options(userdesc, perms, mlist, opts, vals):
for (k, v) in vals.iteritems():
if k not in opts:
continue
try:
details = get_list_info(udesc, perms, mlist, (email is None and vhost == PLATAL_DOMAIN))[0]
result.append(details)
- except:
+ except Exception, e:
+ sys.stderr.write('Can\'t get list %s: %s\n' % (name, str(e)))
continue
return result
"""
members = mlist.getRegularMemberKeys()
added = []
- mlist.Lock()
for user in users:
email, name = to_forlife(user)
if ( email is None ) or ( email in members ):
filterlevel = 3
return (filterlevel << 1) + unsurelevel
-def set_bogo_level(userdesc, perms, vhost, listname, level):
+def set_bogo_level(userdesc, perms, mlist, level):
""" Set filter to the specified level.
@mlist
@edit
if mlist.real_name.lower() != listname:
options['real_name'] = listname, mlist.real_name
if correct: mlist.real_name = listname
- details = get_list_info(userdesc, perms, mlist)[0]
- return (details, options)
+ return 1
def check_options(userdesc, perms, vhost, listname, correct=False):
def get_all_lists(userdesc, perms, vhost):
""" Get all the list for the given vhost
+ @root
"""
prefix = vhost.lower()+VHOST_SEP
names = Utils.list_names()
result.append(name.replace(prefix, ''))
return result
+def get_all_user_lists(userdesc, perms, vhost, email):
+ """ Get all the lists for the given user
+ @root
+ """
+ names = Utils.list_names()
+ names.sort()
+ result = []
+ for name in names:
+ try:
+ mlist = MailList.MailList(name, lock=0)
+ ismember = email in mlist.getRegularMemberKeys()
+ isowner = email in mlist.owner
+ if not ismember and not isowner:
+ continue
+ host = mlist.internal_name().split(VHOST_SEP)[0].lower()
+ result.append({ 'list': mlist.real_name,
+ 'addr': mlist.real_name.lower() + '@' + host,
+ 'host': host,
+ 'own' : isowner,
+ 'sub' : ismember
+ })
+ except Exception, e:
+ continue
+ return result
+
def create_list(userdesc, perms, vhost, listname, desc, advertise, modlevel, inslevel, owners, members):
""" Create a new list.
@root
mlist.header_filter_rules = []
mlist.header_filter_rules.append(('X-Spam-Flag: Unsure, tests=bogofilter', mm_cfg.HOLD, False))
mlist.header_filter_rules.append(('X-Spam-Flag: Yes, tests=bogofilter', mm_cfg.HOLD, False))
- mlist.Save()
- mlist.Unlock()
if ON_CREATE_CMD != '':
try: os.system(ON_CREATE_CMD + ' ' + name)
except: pass
- check_options(userdesc, perms, mlist, True)
+ check_options_runner(userdesc, perms, mlist, listname.lower(), True)
mass_subscribe(userdesc, perms, mlist, members)
+ mlist.Save()
+ finally:
+ mlist.Unlock()
- # avoid the "-1 mail to moderate" bug
- mlist = MailList.MailList(name)
+ # avoid the "-1 mail to moderate" bug
+ mlist = MailList.MailList(name)
+ try:
mlist._UpdateRecords()
mlist.Save()
-
- return 1
finally:
mlist.Unlock()
- return 0
+ return 1
def delete_list(userdesc, perms, mlist, del_archives=0):
""" Delete the list.
server.register_function(check_options)
# create + del
server.register_function(get_all_lists)
+server.register_function(get_all_user_lists)
server.register_function(create_list)
server.register_function(delete_list)
# utilisateurs.php