#!/usr/bin/env python
#***************************************************************************
-#* Copyright (C) 2004-2008 polytechnique.org *
+#* Copyright (C) 2003-2014 Polytechnique.org *
#* http://opensource.polytechnique.org/ *
#* *
#* This program is free software; you can redistribute it and/or modify *
import base64, MySQLdb, os, getopt, sys, sha, signal, re, shutil, ConfigParser
import MySQLdb.converters
import SocketServer
+import errno
+import traceback
sys.path.append('/usr/lib/mailman/bin')
MYSQL_USER = get_config('Core', 'dbuser')
MYSQL_PASS = get_config('Core', 'dbpwd')
+MYSQL_HOST = get_config('Core', 'dbhost')
+MYSQL_DB = get_config('Core', 'dbdb')
PLATAL_DOMAIN = get_config('Mail', 'domain')
PLATAL_DOMAIN2 = get_config('Mail', 'domain2', '')
sys.stderr.write('PLATAL_DOMAIN = %s\n' % PLATAL_DOMAIN )
+sys.stderr.write("MYSQL_DB = %s\n" % MYSQL_DB)
VHOST_SEP = get_config('Lists', 'vhost_sep', '_')
ON_CREATE_CMD = get_config('Lists', 'on_create', '')
except:
raise Exception('method "%s" is not supported' % method)
+ def is_rpc_path_valid(self):
+ return True
def _dispatch(self, method, params):
- new_params = list(params)
return list_call_dispatcher(self._get_function(method), self.data[0], self.data[1], self.data[2], *params)
def do_POST(self):
self.end_headers()
def getUser(self, uid, md5, vhost):
- res = mysql_fetchone ("""SELECT CONCAT(u.prenom, ' ', u.nom), a.alias, u.perms
- FROM auth_user_md5 AS u
- INNER JOIN aliases AS a ON ( a.id=u.user_id AND a.type='a_vie' )
- WHERE u.user_id = '%s' AND u.password = '%s' AND u.perms IN ('admin', 'user')
- LIMIT 1""" %( uid, md5 ) )
+ res = mysql_fetchone ("""SELECT a.full_name, IF(s.email IS NULL, a.email, CONCAT(s.email, '@%s')),
+ IF (a.is_admin, 'admin',
+ IF(FIND_IN_SET('lists', at.perms) OR FIND_IN_SET('lists', a.user_perms), 'lists', NULL))
+ FROM accounts AS a
+ INNER JOIN account_types AS at ON (at.type = a.type)
+ LEFT JOIN email_source_account AS s ON (s.uid = a.uid AND s.type = 'forlife')
+ WHERE a.uid = '%s' AND a.password = '%s' AND a.state = 'active'
+ LIMIT 1""" \
+ % (PLATAL_DOMAIN, uid, md5))
if res:
name, forlife, perms = res
- if vhost != PLATAL_DOMAIN:
- res = mysql_fetchone ("""SELECT uid
- FROM groupex.membres AS m
- INNER JOIN groupex.asso AS a ON (m.asso_id = a.id)
- WHERE perms='admin' AND uid='%s' AND mail_domain='%s'""" %( uid , vhost ) )
- if res: perms= 'admin'
- userdesc = UserDesc(forlife+'@'+PLATAL_DOMAIN, name, None, 0)
+ if vhost != PLATAL_DOMAIN and perms != 'admin':
+ res = mysql_fetchone ("""SELECT m.uid, IF(m.perms = 'admin', 'admin', 'lists')
+ FROM group_members AS m
+ INNER JOIN groups AS g ON (m.asso_id = g.id)
+ WHERE uid = '%s' AND mail_domain = '%s'""" \
+ % (uid, vhost))
+ if res:
+ _, perms = res
+ userdesc = UserDesc(forlife, name, None, 0)
return (userdesc, perms, vhost)
else:
+ print >> sys.stderr, "no user found for uid: %s, passwd: %s" % (uid, md5)
return None
################################################################################
def connectDB():
db = MySQLdb.connect(
- db='x4dat',
+ db=MYSQL_DB,
user=MYSQL_USER,
passwd=MYSQL_PASS,
- unix_socket='/var/run/mysqld/mysqld.sock')
+ host=MYSQL_HOST)
db.ping()
+ db.autocommit(True)
return db.cursor()
def mysql_fetchone(query):
mbox = email
fqdn = PLATAL_DOMAIN
if ( fqdn == PLATAL_DOMAIN ) or ( fqdn == PLATAL_DOMAIN2 ):
- res = mysql_fetchone("""SELECT CONCAT(f.alias, '@%s'), CONCAT(u.prenom, ' ', u.nom)
- FROM auth_user_md5 AS u
- INNER JOIN aliases AS f ON (f.id=u.user_id AND f.type='a_vie')
- INNER JOIN aliases AS a ON (a.id=u.user_id AND a.alias='%s' AND a.type!='homonyme')
- WHERE u.perms IN ('admin', 'user')
- LIMIT 1""" %( PLATAL_DOMAIN, mbox ) )
+ res = mysql_fetchone("""SELECT CONCAT(s1.email, '@%s'), a.full_name
+ FROM accounts AS a
+ INNER JOIN email_source_account AS s1 ON (a.uid = s1.uid AND s1.type = 'forlife')
+ INNER JOIN email_source_account AS s2 ON (a.uid = s2.uid AND s2.email = '%s')
+ WHERE a.state = 'active'
+ LIMIT 1""" \
+ % (PLATAL_DOMAIN, mbox))
if res:
return res
else:
@root: the handler requires site admin rights
"""
try:
+ print >> sys.stderr, "calling method: %s" % method
if has_annotation(method, "root") and perms != "admin":
return 0
if has_annotation(method, "mlist"):
- listname = arg[0]
+ listname = str(arg[0])
arg = arg[1:]
mlist = MailList.MailList(vhost + VHOST_SEP + listname.lower(), lock=0)
if has_annotation(method, "admin") and not is_admin_on(userdesc, perms, mlist):
mlist.Unlock()
return ret
except Exception, e:
+ traceback.print_exc(file=sys.stderr)
sys.stderr.write('Exception in locked call %s: %s\n' % (method.__name__, str(e)))
mlist.Unlock()
return 0
members = mlist.getRegularMemberKeys()
is_member = userdesc.address in members
is_owner = userdesc.address in mlist.owner
- if mlist.advertised or is_member or is_owner or (not front_page and perms == 'admin'):
+ if (mlist.advertised and perms in ('lists', 'admin')) or is_member or is_owner or (not front_page and perms == 'admin'):
is_pending = False
if not is_member and (mlist.subscribe_policy > 1):
is_pending = list_call_locked(is_subscription_pending, userdesc, perms, mlist, False)
if is_pending is 0:
- return 0
+ return None
host = mlist.internal_name().split(VHOST_SEP)[0].lower()
details = {
'nbsub': len(members)
}
return (details, members)
- return 0
+ return None
def get_options(userdesc, perms, mlist, opts):
""" Get the options of a list.
details = get_list_info(userdesc, perms, mlist)[0]
return (details, options)
-def set_options(userdesc, perms, mlist, vals):
- """ Set the options of a list.
- @mlist
- @edit
- @admin
- """
+def set_options(userdesc, perms, mlist, opts, vals):
for (k, v) in vals.iteritems():
if k not in opts:
continue
except:
continue
try:
- details = get_list_info(udesc, perms, mlist, (email is None and vhost == PLATAL_DOMAIN))[0]
- result.append(details)
+ details = get_list_info(udesc, perms, mlist, (email is None and vhost == PLATAL_DOMAIN))
+ if details is not None:
+ result.append(details[0])
except Exception, e:
sys.stderr.write('Can\'t get list %s: %s\n' % (name, str(e)))
continue
""" List the members of a list.
@mlist
"""
- details, members = get_list_info(userdesc, perms, mlist)
+ infos = get_list_info(userdesc, perms, mlist)
+ if infos is None:
+ # Do not return None, this is not serializable
+ return 0
+ details, members = infos
members.sort()
members = map(lambda member: (get_name(member), member), members)
return (details, members, mlist.owner)
@edit
@admin
"""
+ if isinstance(users, dict):
+ users = users.values()
+ if not isinstance(users, list):
+ raise Exception("userlist must be a list")
members = mlist.getRegularMemberKeys()
added = []
for user in users:
@edit
@admin
"""
+ # Force encoding to mailman's default for french, since this is what
+ # Mailman will use internally
+ # LC_DESCRIPTIONS is a dict of lang => (name, charset, direction) tuples.
+ encoding = mm_cfg.LC_DESCRIPTIONS['fr'][1]
+ comment = comment.encode(encoding, 'replace')
mlist.HandleRequest(int(id), int(value), comment)
return 1
filterlevel = 3
return (filterlevel << 1) + unsurelevel
-def set_bogo_level(userdesc, perms, vhost, listname, level):
+def set_bogo_level(userdesc, perms, mlist, level):
""" Set filter to the specified level.
@mlist
@edit
def get_all_lists(userdesc, perms, vhost):
""" Get all the list for the given vhost
+ @root
"""
prefix = vhost.lower()+VHOST_SEP
names = Utils.list_names()
result.append(name.replace(prefix, ''))
return result
+def get_all_user_lists(userdesc, perms, vhost, email):
+ """ Get all the lists for the given user
+ @root
+ """
+ names = Utils.list_names()
+ names.sort()
+ result = []
+ for name in names:
+ try:
+ mlist = MailList.MailList(name, lock=0)
+ ismember = email in mlist.getRegularMemberKeys()
+ isowner = email in mlist.owner
+ if not ismember and not isowner:
+ continue
+ host = mlist.internal_name().split(VHOST_SEP)[0].lower()
+ result.append({ 'list': mlist.real_name,
+ 'addr': mlist.real_name.lower() + '@' + host,
+ 'host': host,
+ 'own' : isowner,
+ 'sub' : ismember
+ })
+ except Exception, e:
+ continue
+ return result
+
+def change_user_email(userdesc, perms, vhost, from_email, to_email):
+ """ Change the email of a user
+ @root
+ """
+ from_email = from_email.lower()
+ to_email = to_email.lower()
+ for list in Utils.list_names():
+ try:
+ mlist = MailList.MailList(list, lock=0)
+ except:
+ continue
+ try:
+ mlist.Lock()
+ mlist.ApprovedChangeMemberAddress(from_email, to_email, 0)
+ mlist.Save()
+ mlist.Unlock()
+ except:
+ mlist.Unlock()
+ return 1
+
+
def create_list(userdesc, perms, vhost, listname, desc, advertise, modlevel, inslevel, owners, members):
""" Create a new list.
@root
"""
name = vhost.lower() + VHOST_SEP + listname.lower();
if Utils.list_exists(name):
+ print >> sys.stderr, "List ", name, " already exists"
return 0
owner = []
for o in owners:
- email = to_forlife(o)[0]
+ email = to_forlife(o)
+ print >> sys.stderr, "owner in list", o, email
+ email = email[0]
if email is not None:
owner.append(email)
if len(owner) is 0:
+ print >> sys.stderr, "No owner found in ", owners
return 0
mlist = MailList.MailList()
# avoid the "-1 mail to moderate" bug
mlist = MailList.MailList(name)
- mlist._UpdateRecords()
- mlist.Save()
+ try:
+ mlist._UpdateRecords()
+ mlist.Save()
+ finally:
+ mlist.Unlock()
return 1
def delete_list(userdesc, perms, mlist, del_archives=0):
server.register_function(check_options)
# create + del
server.register_function(get_all_lists)
+server.register_function(get_all_user_lists)
+server.register_function(change_user_email)
server.register_function(create_list)
server.register_function(delete_list)
# utilisateurs.php