lists: first MailingList abstraction.
authorRaphaël Barrois <raphael.barrois@polytechnique.org>
Sat, 2 Feb 2013 19:23:54 +0000 (20:23 +0100)
committerRaphaël Barrois <raphael.barrois@polytechnique.org>
Sat, 2 Feb 2013 19:23:56 +0000 (20:23 +0100)
Signed-off-by: Raphaël Barrois <raphael.barrois@polytechnique.org>
xnet/lists/__init__.py [new file with mode: 0644]
xnet/lists/base.py [new file with mode: 0644]

diff --git a/xnet/lists/__init__.py b/xnet/lists/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/xnet/lists/base.py b/xnet/lists/base.py
new file mode 100644 (file)
index 0000000..007eced
--- /dev/null
@@ -0,0 +1,198 @@
+# -*- coding: utf-8 -*-
+
+import functools
+
+from MailMan import UserDesc as mailman_userdesc
+from MailMan import MailList as mailman_mlist
+
+from accounts import models as accounts_models
+
+
+class ListError(Exception):
+    pass
+
+
+class OperationForbidden(ListError):
+    pass
+
+
+class InvalidOperation(ListError):
+    pass
+
+
+def is_mlist_admin(mlist, user):
+    if user.is_superuser or user in mlist.owner:
+        return True
+    if user.memberships.filter(group__dns=self.domain, level=accounts_models.Membership.LEVEL_ADMIN).exists():
+        return True
+    return False
+
+def require_mlist_admin(mlist, user):
+    if not is_mlist_admin(mlist, user):
+        raise OperationForbidden()
+
+
+def lock_mlist(fun):
+    """Lock a mailing list.
+
+    This will:
+    - Open the list, locking it
+    - If the function didn't raise an error, save the list
+    - Unlock the list
+    """
+    @functools.wraps(fun)
+    def decorated(self, *args, **kwargs):
+        mlist = self.get_mlist(lock=True)
+        try:
+            kwargs.update(mlist=mlist)
+            result = fun(self, *args, **kwargs)
+            mlist.Save()
+            return result
+        finally:
+            mlist.Unlock()
+
+    return decorated
+
+
+def with_mlist(fun):
+    @functools.wraps(fun)
+    def decorated(self, *args, **kwargs):
+        mlist = self.get_mlist(lock=False)
+        kwargs.update(mlist=mlist)
+        return fun(self, *args, **kwargs)
+
+    return decorated
+
+
+def require_list_admin(fun):
+    """Ensure the given 'user' is an admin of the 'mlist'."""
+    @functools.wraps(fun)
+    def decorated(*args, **kwargs):
+        user = kwargs['user']
+        mlist = kwargs['mlist']
+        require_mlist_admin(mlist, user)
+
+        return fun(*args, **kwargs)
+
+    return decorated
+
+
+class MailingList(object):
+
+    def __init__(self, mbox, domain, *args, **kwargs):
+        self.domain = domain
+        self.mbox = mbox
+        self.address = '%s@%s' % (self.mbox, self.domain)
+        super(MailingList, self).__init__(*args, **kwargs)
+
+    @property
+    def group(self):
+        try:
+            return accounts_models.XGroup.objects.get(dns=self.domain)
+        except accounts_models.XGroup.DoesNotExist:
+            return None
+
+    # MailMan interactions
+    # ====================
+
+    @property
+    def mailman_name(self):
+        return '%(domain)s_%(mbox)s' % dict(
+            domain=self.domain,
+            mbox=self.mbox,
+        )
+
+    def get_mlist(self, lock=False):
+        """The mailman-style MailList object."""
+        return mailman_mlist.MailList(self.mailman_name, lock=lock)
+
+    def get_userdesc(self, user):
+        return mailman_userdesc.UserDesc(
+            user.email,
+            user.get_full_name(),
+        )
+
+    # Subscribing
+    # ===========
+
+    @lock_mlist
+    def subscribe(self, user, mlist):
+        userdesc = self.get_userdesc(user)
+
+        subscribed = True
+        if mlist.subscribe_policy in (0, 1):
+            # Open or user-confirmed subscription
+            mlist.ApprovedAddMember(userdesc)
+        elif userdesc.address in mlist.owner:
+            mlist.ApprovedAddMember(userdesc)
+        else:
+            # Admin-confirmed subscription
+            mlist.HoldSubscription(userdesc.address, userdesc.fullname,
+                password='', digest='', lang='')
+            subscribed = False
+
+        return subscribed
+
+    @lock_mlist
+    @require_mlist_admin
+    def subscribe_user(self, user, target, mlist):
+        userdesc = self.get_userdesc(target)
+        mlist.ApprovedAddMember(userdesc)
+
+    @lock_mlist
+    def unsubscribe(self, user, mlist):
+        userdesc = self.get_userdesc(user)
+
+        mlist.ApprovedDeleteMember(userdesc.address)
+
+    @lock_mlist
+    @require_mlist_admin
+    def unsubscribe_user(self, user, target, mlist):
+        userdesc = self.get_userdesc(target)
+        mlist.ApprovedDeleteMember(userdesc)
+
+    @lock_mlist
+    @require_mlist_admin
+    def mass_subscribe(self, user, users, mlist):
+        for target in users:
+            userdesc = self.get_userdesc(target)
+            if userdesc.address not in mlist.members:
+                mlist.ApprovedAddMember(userdesc)
+                yield added
+
+    @lock_mlist
+    @require_mlist_admin
+    def mass_unsubscribe(self, user, users, mlist):
+        for target in users:
+            userdesc = self.get_userdesc(target)
+            if userdesc.address in mlist.members:
+                mlist.ApprovedDeleteMember(userdesc)
+                yield target
+
+    @lock_mlist
+    @require_mlist_admin
+    def add_owner(self, user, target, mlist):
+        userdesc = self.get_userdesc(target)
+        mlist.owner.append(userdesc.address)
+
+    @lock_mlist
+    @require_mlist_admin
+    def del_owner(self, user, target, mlist):
+        if len(mlist.owner < 2):
+            # Can't remove last admin
+            raise InvalidOperation(u"Impossible de supprimer le dernier modérateur.")
+
+        userdesc = self.get_userdesc(target)
+        mlist.owner.remove(userdesc.address)
+
+    # Listing
+    # =======
+
+    @with_mlist
+    def get_members(self, user, mlist):
+        return mlist.members
+
+    @classmethod
+    def get_lists(cls, domain):
+        # TODO
+        pass