From 8abdcd05a6c91d561eac7274e8c2c0493bfd5282 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rapha=C3=ABl=20Barrois?= Date: Sat, 2 Feb 2013 20:23:54 +0100 Subject: [PATCH] lists: first MailingList abstraction. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphaël Barrois --- xnet/lists/__init__.py | 0 xnet/lists/base.py | 198 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 198 insertions(+) create mode 100644 xnet/lists/__init__.py create mode 100644 xnet/lists/base.py diff --git a/xnet/lists/__init__.py b/xnet/lists/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xnet/lists/base.py b/xnet/lists/base.py new file mode 100644 index 0000000..007eced --- /dev/null +++ b/xnet/lists/base.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- + +import functools + +from MailMan import UserDesc as mailman_userdesc +from MailMan import MailList as mailman_mlist + +from accounts import models as accounts_models + + +class ListError(Exception): + pass + + +class OperationForbidden(ListError): + pass + + +class InvalidOperation(ListError): + pass + + +def is_mlist_admin(mlist, user): + if user.is_superuser or user in mlist.owner: + return True + if user.memberships.filter(group__dns=self.domain, level=accounts_models.Membership.LEVEL_ADMIN).exists(): + return True + return False + +def require_mlist_admin(mlist, user): + if not is_mlist_admin(mlist, user): + raise OperationForbidden() + + +def lock_mlist(fun): + """Lock a mailing list. + + This will: + - Open the list, locking it + - If the function didn't raise an error, save the list + - Unlock the list + """ + @functools.wraps(fun) + def decorated(self, *args, **kwargs): + mlist = self.get_mlist(lock=True) + try: + kwargs.update(mlist=mlist) + result = fun(self, *args, **kwargs) + mlist.Save() + return result + finally: + mlist.Unlock() + + return decorated + + +def with_mlist(fun): + @functools.wraps(fun) + def decorated(self, *args, **kwargs): + mlist = self.get_mlist(lock=False) + kwargs.update(mlist=mlist) + return fun(self, *args, **kwargs) + + return decorated + + +def require_list_admin(fun): + """Ensure the given 'user' is an admin of the 'mlist'.""" + @functools.wraps(fun) + def decorated(*args, **kwargs): + user = kwargs['user'] + mlist = kwargs['mlist'] + require_mlist_admin(mlist, user) + + return fun(*args, **kwargs) + + return decorated + + +class MailingList(object): + + def __init__(self, mbox, domain, *args, **kwargs): + self.domain = domain + self.mbox = mbox + self.address = '%s@%s' % (self.mbox, self.domain) + super(MailingList, self).__init__(*args, **kwargs) + + @property + def group(self): + try: + return accounts_models.XGroup.objects.get(dns=self.domain) + except accounts_models.XGroup.DoesNotExist: + return None + + # MailMan interactions + # ==================== + + @property + def mailman_name(self): + return '%(domain)s_%(mbox)s' % dict( + domain=self.domain, + mbox=self.mbox, + ) + + def get_mlist(self, lock=False): + """The mailman-style MailList object.""" + return mailman_mlist.MailList(self.mailman_name, lock=lock) + + def get_userdesc(self, user): + return mailman_userdesc.UserDesc( + user.email, + user.get_full_name(), + ) + + # Subscribing + # =========== + + @lock_mlist + def subscribe(self, user, mlist): + userdesc = self.get_userdesc(user) + + subscribed = True + if mlist.subscribe_policy in (0, 1): + # Open or user-confirmed subscription + mlist.ApprovedAddMember(userdesc) + elif userdesc.address in mlist.owner: + mlist.ApprovedAddMember(userdesc) + else: + # Admin-confirmed subscription + mlist.HoldSubscription(userdesc.address, userdesc.fullname, + password='', digest='', lang='') + subscribed = False + + return subscribed + + @lock_mlist + @require_mlist_admin + def subscribe_user(self, user, target, mlist): + userdesc = self.get_userdesc(target) + mlist.ApprovedAddMember(userdesc) + + @lock_mlist + def unsubscribe(self, user, mlist): + userdesc = self.get_userdesc(user) + + mlist.ApprovedDeleteMember(userdesc.address) + + @lock_mlist + @require_mlist_admin + def unsubscribe_user(self, user, target, mlist): + userdesc = self.get_userdesc(target) + mlist.ApprovedDeleteMember(userdesc) + + @lock_mlist + @require_mlist_admin + def mass_subscribe(self, user, users, mlist): + for target in users: + userdesc = self.get_userdesc(target) + if userdesc.address not in mlist.members: + mlist.ApprovedAddMember(userdesc) + yield added + + @lock_mlist + @require_mlist_admin + def mass_unsubscribe(self, user, users, mlist): + for target in users: + userdesc = self.get_userdesc(target) + if userdesc.address in mlist.members: + mlist.ApprovedDeleteMember(userdesc) + yield target + + @lock_mlist + @require_mlist_admin + def add_owner(self, user, target, mlist): + userdesc = self.get_userdesc(target) + mlist.owner.append(userdesc.address) + + @lock_mlist + @require_mlist_admin + def del_owner(self, user, target, mlist): + if len(mlist.owner < 2): + # Can't remove last admin + raise InvalidOperation(u"Impossible de supprimer le dernier modérateur.") + + userdesc = self.get_userdesc(target) + mlist.owner.remove(userdesc.address) + + # Listing + # ======= + + @with_mlist + def get_members(self, user, mlist): + return mlist.members + + @classmethod + def get_lists(cls, domain): + # TODO + pass -- 2.1.4