config = ConfigParser.ConfigParser()
config.read(os.path.dirname(__file__)+'/../configs/platal.conf')
-def get_config(sec,val,default=None):
+def get_config(sec, val, default=None):
try:
return config.get(sec, val)[1:-1]
except ConfigParser.NoOptionError, e:
PLATAL_DOMAIN = get_config('Mail', 'domain')
PLATAL_DOMAIN2 = get_config('Mail', 'domain2', '')
-ML_OWNER = get_config('Lists', 'admin_owner')
VHOST_SEP = get_config('Lists', 'vhost_sep', '_')
ON_CREATE_CMD = get_config('Lists', 'on_create', '')
argument a UserDesc taken from the database, containing name, email and perms
"""
- def _dispatch(self,method,params):
+ def _dispatch(self, method, params):
# TODO: subclass in SimpleXMLRPCDispatcher and not here.
new_params = list(params)
- new_params.insert(0,self.data[2])
- new_params.insert(0,self.data[1])
- new_params.insert(0,self.data[0])
- return self.server._dispatch(method,new_params)
+ new_params.insert(0, self.data[2])
+ new_params.insert(0, self.data[1])
+ new_params.insert(0, self.data[0])
+ return self.server._dispatch(method, new_params)
def do_POST(self):
try:
_, auth = self.headers["authorization"].split()
uid, md5 = base64.decodestring(auth).strip().split(':')
vhost = self.path.split('/')[1].lower()
- self.data = self.getUser(uid,md5,vhost)
+ self.data = self.getUser(uid, md5, vhost)
if self.data is None:
raise AuthFailed
# Call super.do_POST() to do the actual work
self.end_headers()
def getUser(self, uid, md5, vhost):
- res = mysql_fetchone ("""SELECT CONCAT(u.prenom, ' ',u.nom),a.alias,u.perms
+ res = mysql_fetchone ("""SELECT CONCAT(u.prenom, ' ', u.nom), a.alias, u.perms
FROM auth_user_md5 AS u
INNER JOIN aliases AS a ON ( a.id=u.user_id AND a.type='a_vie' )
- WHERE u.user_id = '%s' AND u.password = '%s' AND u.perms IN ('admin','user')
+ WHERE u.user_id = '%s' AND u.password = '%s' AND u.perms IN ('admin', 'user')
LIMIT 1""" %( uid, md5 ) )
if res:
- name,forlife,perms = res
+ name, forlife, perms = res
if vhost != PLATAL_DOMAIN:
res = mysql_fetchone ("""SELECT uid
FROM groupex.membres AS m
WHERE perms='admin' AND uid='%s' AND mail_domain='%s'""" %( uid , vhost ) )
if res: perms= 'admin'
userdesc = UserDesc(forlife+'@'+PLATAL_DOMAIN, name, None, 0)
- return (userdesc,perms,vhost)
+ return (userdesc, perms, vhost)
else:
return None
-
+
################################################################################
#
# XML RPC STUFF
lock.release()
return ret
-def is_owner(userdesc,perms,mlist):
- return ( perms == 'admin' and ML_OWNER in mlist.owner ) or ( userdesc.address in mlist.owner )
-
-def is_admin_on(userdesc,perms,mlist):
+def is_admin_on(userdesc, perms, mlist):
return ( perms == 'admin' ) or ( userdesc.address in mlist.owner )
-def quote(s,is_header=False):
+def quote(s, is_header=False):
if is_header:
- h = Utils.oneline(s,'iso-8859-1')
+ h = Utils.oneline(s, 'iso-8859-1')
else:
h = s
h = str('').join(re.split('[\x00-\x1f]+', s))
- return Utils.uquote(h.replace('&','&').replace('>','>').replace('<','<'))
+ return Utils.uquote(h.replace('&', '&').replace('>', '>').replace('<', '<'))
def to_forlife(email):
try:
- mbox,fqdn = email.split('@')
+ mbox, fqdn = email.split('@')
except:
mbox = email
fqdn = PLATAL_DOMAIN
if ( fqdn == PLATAL_DOMAIN ) or ( fqdn == PLATAL_DOMAIN2 ):
- res = mysql_fetchone("""SELECT CONCAT(f.alias,'@%s'), CONCAT(u.prenom,' ',u.nom)
+ res = mysql_fetchone("""SELECT CONCAT(f.alias, '@%s'), CONCAT(u.prenom, ' ', u.nom)
FROM auth_user_md5 AS u
INNER JOIN aliases AS f ON (f.id=u.user_id AND f.type='a_vie')
INNER JOIN aliases AS a ON (a.id=u.user_id AND a.alias='%s' AND a.type!='homonyme')
- WHERE u.perms IN ('admin','user')
+ WHERE u.perms IN ('admin', 'user')
LIMIT 1""" %( PLATAL_DOMAIN, mbox ) )
if res:
return res
else:
- return (None,None)
- return (email,mbox)
+ return (None, None)
+ return (email, mbox)
##
# see /usr/lib/mailman/bin/rmlist
os.unlink(filename)
elif os.path.isdir(filename):
shutil.rmtree(filename)
-
+
#-------------------------------------------------------------------------------
# helpers on lists
#
-def get_list_info(userdesc,perms,mlist,front_page=0):
+def get_list_info(userdesc, perms, mlist, front_page=0):
members = mlist.getRegularMemberKeys()
is_member = userdesc.address in members
- is_admin = ML_OWNER in mlist.owner
- is_owner = ( perms == 'admin' and is_admin ) or ( userdesc.address in mlist.owner )
+ is_owner = userdesc.address in mlist.owner
if mlist.advertised or is_member or is_owner or (not front_page and perms == 'admin'):
is_pending = False
if not is_member and (mlist.subscribe_policy > 1):
'info' : quote(mlist.info),
'diff' : (mlist.default_member_moderation>0) + (mlist.generic_nonmember_action>0),
'ins' : mlist.subscribe_policy > 1,
- 'priv' : (1-mlist.advertised)+2*is_admin,
+ 'priv' : 1-mlist.advertised,
'sub' : 2*is_member + is_pending,
'own' : is_owner,
'nbsub': len(members)
}
- return (details,members)
+ return (details, members)
return 0
-def get_options(userdesc,perms,vhost,listname,opts):
+def get_options(userdesc, perms, vhost, listname, opts):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
if not is_admin_on(userdesc, perms, mlist):
return 0
options = { }
- for (k,v) in mlist.__dict__.iteritems():
+ for (k, v) in mlist.__dict__.iteritems():
if k in opts:
if type(v) is str:
options[k] = quote(v)
else: options[k] = v
- details = get_list_info(userdesc,perms,mlist)[0]
- return (details,options)
+ details = get_list_info(userdesc, perms, mlist)[0]
+ return (details, options)
except:
return 0
-def set_options(userdesc,perms,vhost,listname,opts,vals):
+def set_options(userdesc, perms, vhost, listname, opts, vals):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
if not is_admin_on(userdesc, perms, mlist):
return 0
mlist.Lock()
- for (k,v) in vals.iteritems():
+ for (k, v) in vals.iteritems():
if k not in opts:
continue
if k == 'default_member_moderation':
t = type(mlist.__dict__[k])
if t is bool: mlist.__dict__[k] = bool(v)
elif t is int: mlist.__dict__[k] = int(v)
- elif t is str: mlist.__dict__[k] = Utils.uncanonstr(v,'fr')
+ elif t is str: mlist.__dict__[k] = Utils.uncanonstr(v, 'fr')
else: mlist.__dict__[k] = v
mlist.Save()
mlist.Unlock()
# users procedures for [ index.php ]
#
-def get_lists(userdesc,perms,vhost,email=None):
+def get_lists(userdesc, perms, vhost, email=None):
if email is None:
udesc = userdesc
else:
if not name.startswith(prefix):
continue
try:
- mlist = MailList.MailList(name,lock=0)
+ mlist = MailList.MailList(name, lock=0)
except:
continue
try:
- details = get_list_info(udesc,perms,mlist,(email is None and vhost == PLATAL_DOMAIN))[0]
+ details = get_list_info(udesc, perms, mlist, (email is None and vhost == PLATAL_DOMAIN))[0]
result.append(details)
except:
continue
return result
-def subscribe(userdesc,perms,vhost,listname):
+def subscribe(userdesc, perms, vhost, listname):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
mlist.Lock()
- if ( mlist.subscribe_policy in (0,1) ) or is_owner(userdesc,perms,mlist):
+ if ( mlist.subscribe_policy in (0, 1) ) or userdesc.address in mlist.owner:
mlist.ApprovedAddMember(userdesc)
result = 2
else:
mlist.Unlock()
return result
-def unsubscribe(userdesc,perms,vhost,listname):
+def unsubscribe(userdesc, perms, vhost, listname):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
# users procedures for [ index.php ]
#
-def get_members(userdesc,perms,vhost,listname):
+def get_members(userdesc, perms, vhost, listname):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
- details,members = get_list_info(userdesc,perms,mlist)
+ details, members = get_list_info(userdesc, perms, mlist)
members.sort()
members = map(lambda member: (quote(mlist.getMemberName(member)) or '', member), members)
- return (details,members,mlist.owner)
+ return (details, members, mlist.owner)
except:
return 0
# users procedures for [ trombi.php ]
#
-def get_members_limit(userdesc,perms,vhost,listname,page,nb_per_page):
+def get_members_limit(userdesc, perms, vhost, listname, page, nb_per_page):
try:
- members = get_members(userdesc,perms,vhost,listname.lower())[1]
+ members = get_members(userdesc, perms, vhost, listname.lower())[1]
except:
return 0
i = int(page) * int(nb_per_page)
return (len(members), members[i:i+int(nb_per_page)])
-def get_owners(userdesc,perms,vhost,listname):
+def get_owners(userdesc, perms, vhost, listname):
try:
- details,members,owners = get_members(userdesc,perms,vhost,listname.lower())
+ details, members, owners = get_members(userdesc, perms, vhost, listname.lower())
except:
return 0
- return (details,owners)
+ return (details, owners)
#-------------------------------------------------------------------------------
# owners procedures [ admin.php ]
#
-def mass_subscribe(userdesc,perms,vhost,listname,users):
+def mass_subscribe(userdesc, perms, vhost, listname, users):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
if not is_admin_on(userdesc, perms, mlist):
return 0
-
+
members = mlist.getRegularMemberKeys()
added = []
mlist.Lock()
mlist.Unlock()
return added
-def mass_unsubscribe(userdesc,perms,vhost,listname,users):
+def mass_unsubscribe(userdesc, perms, vhost, listname, users):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
if not is_admin_on(userdesc, perms, mlist):
return 0
-
+
mlist.Lock()
map(lambda user: mlist.ApprovedDeleteMember(user), users)
mlist.Save()
mlist.Unlock()
return users
-def add_owner(userdesc,perms,vhost,listname,user):
+def add_owner(userdesc, perms, vhost, listname, user):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
mlist.Unlock()
return True
-def del_owner(userdesc,perms,vhost,listname,user):
+def del_owner(userdesc, perms, vhost, listname, user):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
# owners procedures [ admin.php ]
#
-def get_pending_ops(userdesc,perms,vhost,listname):
+def get_pending_ops(userdesc, perms, vhost, listname):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
if not is_admin_on(userdesc, perms, mlist):
return 0
-
+
mlist.Lock()
-
+
subs = []
seen = []
dosave = False
except:
mlist.Unlock()
return 0
- return (subs,helds)
+ return (subs, helds)
-def handle_request(userdesc,perms,vhost,listname,id,value,comment):
+def handle_request(userdesc, perms, vhost, listname, id, value, comment):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
if not is_admin_on(userdesc, perms, mlist):
return 0
mlist.Lock()
- mlist.HandleRequest(int(id),int(value),comment)
+ mlist.HandleRequest(int(id), int(value), comment)
mlist.Save()
mlist.Unlock()
return 1
return 0
-def get_pending_mail(userdesc,perms,vhost,listname,id,raw=0):
+def get_pending_mail(userdesc, perms, vhost, listname, id, raw=0):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
size = os.path.getsize(fpath)
msg = readMessage(fpath)
mlist.Unlock()
-
+
if raw:
return str(msg)
results = []
- for part in typed_subpart_iterator(msg,'text','plain'):
+ for part in typed_subpart_iterator(msg, 'text', 'plain'):
c = part.get_payload()
if c is not None: results.append (c)
results = map(lambda x: quote(x), results)
'subject_prefix', 'goodbye_msg', 'send_goodbye_msg', 'subscribe_policy', \
'welcome_msg']
-def get_owner_options(userdesc,perms,vhost,listname):
- return get_options(userdesc,perms,vhost,listname.lower(),owner_opts)
+def get_owner_options(userdesc, perms, vhost, listname):
+ return get_options(userdesc, perms, vhost, listname.lower(), owner_opts)
-def set_owner_options(userdesc,perms,vhost,listname,values):
- return set_options(userdesc,perms,vhost,listname.lower(),owner_opts,values)
+def set_owner_options(userdesc, perms, vhost, listname, values):
+ return set_options(userdesc, perms, vhost, listname.lower(), owner_opts, values)
-def add_to_wl(userdesc,perms,vhost,listname,addr):
+def add_to_wl(userdesc, perms, vhost, listname, addr):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
mlist.Unlock()
return 0
-def del_from_wl(userdesc,perms,vhost,listname,addr):
+def del_from_wl(userdesc, perms, vhost, listname, addr):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
mlist.Unlock()
return 0
-def get_bogo_level(userdesc,perms,vhost,listname):
+def get_bogo_level(userdesc, perms, vhost, listname):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
except:
return 0
-def set_bogo_level(userdesc,perms,vhost,listname,level):
+def set_bogo_level(userdesc, perms, vhost, listname, level):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
admin_opts = [ 'advertised', 'archive', \
'max_message_size', 'msg_footer', 'msg_header']
-def get_admin_options(userdesc,perms,vhost,listname):
+def get_admin_options(userdesc, perms, vhost, listname):
if perms != 'admin':
return 0
- return get_options(userdesc,perms,vhost,listname.lower(),admin_opts)
+ return get_options(userdesc, perms, vhost, listname.lower(), admin_opts)
-def set_admin_options(userdesc,perms,vhost,listname,values):
+def set_admin_options(userdesc, perms, vhost, listname, values):
if perms != 'admin':
return 0
- return set_options(userdesc,perms,vhost,listname.lower(),admin_opts,values)
+ return set_options(userdesc, perms, vhost, listname.lower(), admin_opts, values)
#-------------------------------------------------------------------------------
# admin procedures [ check.php ]
'unsubscribe_policy' : 0,
}
-def check_options(userdesc,perms,vhost,listname,correct=False):
+def check_options(userdesc, perms, vhost, listname, correct=False):
try:
- mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(),lock=0)
+ mlist = MailList.MailList(vhost+VHOST_SEP+listname.lower(), lock=0)
except:
return 0
try:
if correct:
mlist.Lock()
options = { }
- for (k,v) in check_opts.iteritems():
+ for (k, v) in check_opts.iteritems():
if mlist.__dict__[k] != v:
- options[k] = v,mlist.__dict__[k]
+ options[k] = v, mlist.__dict__[k]
if correct: mlist.__dict__[k] = v
if mlist.real_name.lower() != listname:
options['real_name'] = listname, mlist.real_name
if correct:
mlist.Save()
mlist.Unlock()
- details = get_list_info(userdesc,perms,mlist)[0]
- return (details,options)
+ details = get_list_info(userdesc, perms, mlist)[0]
+ return (details, options)
except:
if correct: mlist.Unlock()
return 0
# super-admin procedures
#
-def get_all_lists(userdesc,perms,vhost):
+def get_all_lists(userdesc, perms, vhost):
prefix = vhost.lower()+VHOST_SEP
names = Utils.list_names()
names.sort()
for name in names:
if not name.startswith(prefix):
continue
- result.append(name.replace(prefix,''))
+ result.append(name.replace(prefix, ''))
return result
-def create_list(userdesc,perms,vhost,listname,desc,advertise,modlevel,inslevel,owners,members):
+def create_list(userdesc, perms, vhost, listname, desc, advertise, modlevel, inslevel, owners, members):
if perms != 'admin':
return 0
name = vhost.lower()+VHOST_SEP+listname.lower();
if Utils.list_exists(name):
return 0
-
+
owner = []
for o in owners:
email = to_forlife(o)[0]
try:
oldmask = os.umask(002)
pw = sha.new('foobar').hexdigest()
-
+
try:
mlist.Create(name, owner[0], pw)
finally:
mlist.generic_nonmember_action = int(modlevel) > 0
mlist.subscribe_policy = 2 * (int(inslevel) is 1)
mlist.admin_notify_mchanges = (mlist.subscribe_policy or mlist.generic_nonmember_action or mlist.default_member_moderation or not mlist.advertised)
-
+
mlist.owner = owner
-
+
mlist.subject_prefix = '['+listname+'] '
mlist.max_message_size = 0
mlist.msg_footer = "_______________________________________________\n" \
+ "Liste de diffusion %(real_name)s\n"
-
+
mlist.header_filter_rules = []
mlist.header_filter_rules.append(('X-Spam-Flag: Yes, tests=bogofilter', mm_cfg.HOLD, False))
mlist.Unlock()
- check_options(userdesc,perms,vhost,listname.lower(),True)
- mass_subscribe(userdesc,perms,vhost,listname.lower(),members)
+ check_options(userdesc, perms, vhost, listname.lower(), True)
+ mass_subscribe(userdesc, perms, vhost, listname.lower(), members)
# avoid the "-1 mail to moderate" bug
mlist = MailList.MailList(name)
return 0
return 1
-def delete_list(userdesc,perms,vhost,listname,del_archives=0):
+def delete_list(userdesc, perms, vhost, listname, del_archives=0):
lname = vhost+VHOST_SEP+listname.lower()
try:
- mlist = MailList.MailList(lname,lock=0)
+ mlist = MailList.MailList(lname, lock=0)
except:
return 0
try:
except:
return 0
-def kill(userdesc,perms,vhost,alias,del_from_promo):
+def kill(userdesc, perms, vhost, alias, del_from_promo):
exclude = []
if not del_from_promo:
exclude.append(PLATAL_DOMAIN+VHOST_SEP+'promo'+alias[-4:])
for list in Utils.list_names():
if list in exclude: continue
try:
- mlist = MailList.MailList(list,lock=0)
+ mlist = MailList.MailList(list, lock=0)
except:
continue
try:
mlist.Lock()
- mlist.ApprovedDeleteMember(alias+'@'+PLATAL_DOMAIN,None,0,0)
+ mlist.ApprovedDeleteMember(alias+'@'+PLATAL_DOMAIN, None, 0, 0)
mlist.Save()
mlist.Unlock()
except:
# server
#
class FastXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
- allow_reuse_address = True
+ allow_reuse_address = True
################################################################################
#
-# INIT
+# INIT
#
#-------------------------------------------------------------------------------
# use Mailman user and group (not root)
gid = getgrnam(mm_cfg.MAILMAN_GROUP)[2]
if not os.getuid():
- os.setregid(gid,gid)
- os.setreuid(uid,uid)
+ os.setregid(gid, gid)
+ os.setreuid(uid, uid)
signal.signal(signal.SIGHUP, signal.SIG_IGN)