--- /dev/null
+# -*- coding: utf-8 -*-
+
+import functools
+
+from MailMan import UserDesc as mailman_userdesc
+from MailMan import MailList as mailman_mlist
+
+from accounts import models as accounts_models
+
+
+class ListError(Exception):
+ pass
+
+
+class OperationForbidden(ListError):
+ pass
+
+
+class InvalidOperation(ListError):
+ pass
+
+
+def is_mlist_admin(mlist, user):
+ if user.is_superuser or user in mlist.owner:
+ return True
+ if user.memberships.filter(group__dns=self.domain, level=accounts_models.Membership.LEVEL_ADMIN).exists():
+ return True
+ return False
+
+def require_mlist_admin(mlist, user):
+ if not is_mlist_admin(mlist, user):
+ raise OperationForbidden()
+
+
+def lock_mlist(fun):
+ """Lock a mailing list.
+
+ This will:
+ - Open the list, locking it
+ - If the function didn't raise an error, save the list
+ - Unlock the list
+ """
+ @functools.wraps(fun)
+ def decorated(self, *args, **kwargs):
+ mlist = self.get_mlist(lock=True)
+ try:
+ kwargs.update(mlist=mlist)
+ result = fun(self, *args, **kwargs)
+ mlist.Save()
+ return result
+ finally:
+ mlist.Unlock()
+
+ return decorated
+
+
+def with_mlist(fun):
+ @functools.wraps(fun)
+ def decorated(self, *args, **kwargs):
+ mlist = self.get_mlist(lock=False)
+ kwargs.update(mlist=mlist)
+ return fun(self, *args, **kwargs)
+
+ return decorated
+
+
+def require_list_admin(fun):
+ """Ensure the given 'user' is an admin of the 'mlist'."""
+ @functools.wraps(fun)
+ def decorated(*args, **kwargs):
+ user = kwargs['user']
+ mlist = kwargs['mlist']
+ require_mlist_admin(mlist, user)
+
+ return fun(*args, **kwargs)
+
+ return decorated
+
+
+class MailingList(object):
+
+ def __init__(self, mbox, domain, *args, **kwargs):
+ self.domain = domain
+ self.mbox = mbox
+ self.address = '%s@%s' % (self.mbox, self.domain)
+ super(MailingList, self).__init__(*args, **kwargs)
+
+ @property
+ def group(self):
+ try:
+ return accounts_models.XGroup.objects.get(dns=self.domain)
+ except accounts_models.XGroup.DoesNotExist:
+ return None
+
+ # MailMan interactions
+ # ====================
+
+ @property
+ def mailman_name(self):
+ return '%(domain)s_%(mbox)s' % dict(
+ domain=self.domain,
+ mbox=self.mbox,
+ )
+
+ def get_mlist(self, lock=False):
+ """The mailman-style MailList object."""
+ return mailman_mlist.MailList(self.mailman_name, lock=lock)
+
+ def get_userdesc(self, user):
+ return mailman_userdesc.UserDesc(
+ user.email,
+ user.get_full_name(),
+ )
+
+ # Subscribing
+ # ===========
+
+ @lock_mlist
+ def subscribe(self, user, mlist):
+ userdesc = self.get_userdesc(user)
+
+ subscribed = True
+ if mlist.subscribe_policy in (0, 1):
+ # Open or user-confirmed subscription
+ mlist.ApprovedAddMember(userdesc)
+ elif userdesc.address in mlist.owner:
+ mlist.ApprovedAddMember(userdesc)
+ else:
+ # Admin-confirmed subscription
+ mlist.HoldSubscription(userdesc.address, userdesc.fullname,
+ password='', digest='', lang='')
+ subscribed = False
+
+ return subscribed
+
+ @lock_mlist
+ @require_mlist_admin
+ def subscribe_user(self, user, target, mlist):
+ userdesc = self.get_userdesc(target)
+ mlist.ApprovedAddMember(userdesc)
+
+ @lock_mlist
+ def unsubscribe(self, user, mlist):
+ userdesc = self.get_userdesc(user)
+
+ mlist.ApprovedDeleteMember(userdesc.address)
+
+ @lock_mlist
+ @require_mlist_admin
+ def unsubscribe_user(self, user, target, mlist):
+ userdesc = self.get_userdesc(target)
+ mlist.ApprovedDeleteMember(userdesc)
+
+ @lock_mlist
+ @require_mlist_admin
+ def mass_subscribe(self, user, users, mlist):
+ for target in users:
+ userdesc = self.get_userdesc(target)
+ if userdesc.address not in mlist.members:
+ mlist.ApprovedAddMember(userdesc)
+ yield added
+
+ @lock_mlist
+ @require_mlist_admin
+ def mass_unsubscribe(self, user, users, mlist):
+ for target in users:
+ userdesc = self.get_userdesc(target)
+ if userdesc.address in mlist.members:
+ mlist.ApprovedDeleteMember(userdesc)
+ yield target
+
+ @lock_mlist
+ @require_mlist_admin
+ def add_owner(self, user, target, mlist):
+ userdesc = self.get_userdesc(target)
+ mlist.owner.append(userdesc.address)
+
+ @lock_mlist
+ @require_mlist_admin
+ def del_owner(self, user, target, mlist):
+ if len(mlist.owner < 2):
+ # Can't remove last admin
+ raise InvalidOperation(u"Impossible de supprimer le dernier modérateur.")
+
+ userdesc = self.get_userdesc(target)
+ mlist.owner.remove(userdesc.address)
+
+ # Listing
+ # =======
+
+ @with_mlist
+ def get_members(self, user, mlist):
+ return mlist.members
+
+ @classmethod
+ def get_lists(cls, domain):
+ # TODO
+ pass