Move xnet/accounts to xnet/groups
[xnet] / xnet / lists / base.py
1 # -*- coding: utf-8 -*-
2
3 import functools
4
5 from MailMan import UserDesc as mailman_userdesc
6 from MailMan import MailList as mailman_mlist
7
8 from groups import models as groups_models
9
10
11 class ListError(Exception):
12 pass
13
14
15 class OperationForbidden(ListError):
16 pass
17
18
19 class InvalidOperation(ListError):
20 pass
21
22
23 def is_mlist_admin(mlist, user):
24 if user.is_superuser or user in mlist.owner:
25 return True
26 if user.memberships.filter(group__dns=self.domain, level=groups_models.Membership.LEVEL_ADMIN).exists():
27 return True
28 return False
29
30 def require_mlist_admin(mlist, user):
31 if not is_mlist_admin(mlist, user):
32 raise OperationForbidden()
33
34
35 def lock_mlist(fun):
36 """Lock a mailing list.
37
38 This will:
39 - Open the list, locking it
40 - If the function didn't raise an error, save the list
41 - Unlock the list
42 """
43 @functools.wraps(fun)
44 def decorated(self, *args, **kwargs):
45 mlist = self.get_mlist(lock=True)
46 try:
47 kwargs.update(mlist=mlist)
48 result = fun(self, *args, **kwargs)
49 mlist.Save()
50 return result
51 finally:
52 mlist.Unlock()
53
54 return decorated
55
56
57 def with_mlist(fun):
58 @functools.wraps(fun)
59 def decorated(self, *args, **kwargs):
60 mlist = self.get_mlist(lock=False)
61 kwargs.update(mlist=mlist)
62 return fun(self, *args, **kwargs)
63
64 return decorated
65
66
67 def require_list_admin(fun):
68 """Ensure the given 'user' is an admin of the 'mlist'."""
69 @functools.wraps(fun)
70 def decorated(*args, **kwargs):
71 user = kwargs['user']
72 mlist = kwargs['mlist']
73 require_mlist_admin(mlist, user)
74
75 return fun(*args, **kwargs)
76
77 return decorated
78
79
80 class MailingList(object):
81
82 def __init__(self, mbox, domain, *args, **kwargs):
83 self.domain = domain
84 self.mbox = mbox
85 self.address = '%s@%s' % (self.mbox, self.domain)
86 super(MailingList, self).__init__(*args, **kwargs)
87
88 @property
89 def group(self):
90 try:
91 return groups_models.XGroup.objects.get(dns=self.domain)
92 except groups_models.XGroup.DoesNotExist:
93 return None
94
95 # MailMan interactions
96 # ====================
97
98 @property
99 def mailman_name(self):
100 return '%(domain)s_%(mbox)s' % dict(
101 domain=self.domain,
102 mbox=self.mbox,
103 )
104
105 def get_mlist(self, lock=False):
106 """The mailman-style MailList object."""
107 return mailman_mlist.MailList(self.mailman_name, lock=lock)
108
109 def get_userdesc(self, user):
110 return mailman_userdesc.UserDesc(
111 user.email,
112 user.get_full_name(),
113 )
114
115 # Subscribing
116 # ===========
117
118 @lock_mlist
119 def subscribe(self, user, mlist):
120 userdesc = self.get_userdesc(user)
121
122 subscribed = True
123 if mlist.subscribe_policy in (0, 1):
124 # Open or user-confirmed subscription
125 mlist.ApprovedAddMember(userdesc)
126 elif userdesc.address in mlist.owner:
127 mlist.ApprovedAddMember(userdesc)
128 else:
129 # Admin-confirmed subscription
130 mlist.HoldSubscription(userdesc.address, userdesc.fullname,
131 password='', digest='', lang='')
132 subscribed = False
133
134 return subscribed
135
136 @lock_mlist
137 @require_mlist_admin
138 def subscribe_user(self, user, target, mlist):
139 userdesc = self.get_userdesc(target)
140 mlist.ApprovedAddMember(userdesc)
141
142 @lock_mlist
143 def unsubscribe(self, user, mlist):
144 userdesc = self.get_userdesc(user)
145
146 mlist.ApprovedDeleteMember(userdesc.address)
147
148 @lock_mlist
149 @require_mlist_admin
150 def unsubscribe_user(self, user, target, mlist):
151 userdesc = self.get_userdesc(target)
152 mlist.ApprovedDeleteMember(userdesc)
153
154 @lock_mlist
155 @require_mlist_admin
156 def mass_subscribe(self, user, users, mlist):
157 for target in users:
158 userdesc = self.get_userdesc(target)
159 if userdesc.address not in mlist.members:
160 mlist.ApprovedAddMember(userdesc)
161 yield added
162
163 @lock_mlist
164 @require_mlist_admin
165 def mass_unsubscribe(self, user, users, mlist):
166 for target in users:
167 userdesc = self.get_userdesc(target)
168 if userdesc.address in mlist.members:
169 mlist.ApprovedDeleteMember(userdesc)
170 yield target
171
172 @lock_mlist
173 @require_mlist_admin
174 def add_owner(self, user, target, mlist):
175 userdesc = self.get_userdesc(target)
176 mlist.owner.append(userdesc.address)
177
178 @lock_mlist
179 @require_mlist_admin
180 def del_owner(self, user, target, mlist):
181 if len(mlist.owner < 2):
182 # Can't remove last admin
183 raise InvalidOperation(u"Impossible de supprimer le dernier modérateur.")
184
185 userdesc = self.get_userdesc(target)
186 mlist.owner.remove(userdesc.address)
187
188 # Listing
189 # =======
190
191 @with_mlist
192 def get_members(self, user, mlist):
193 return mlist.members
194
195 @classmethod
196 def get_lists(cls, domain):
197 # TODO
198 pass