Source code for abilian.services.security.service

# coding=utf-8
"""
Security service, manages roles and permissions.
"""
from __future__ import absolute_import, print_function, division

from functools import wraps
from itertools import chain

from flask import g, current_app
from flask_login import current_user
import sqlalchemy as sa
from sqlalchemy.orm import subqueryload, object_session
from sqlalchemy import sql

from abilian.core.models.subjects import User, Group
from abilian.core.entities import Entity
from abilian.core.extensions import db
from abilian.core.util import noproxy
from abilian.services import Service, ServiceState
from abilian.services.security.models import (
  SecurityAudit, RoleAssignment, PermissionAssignment,
  Anonymous as AnonymousRole, Authenticated,
  Admin, Manager, Reader, Writer,
  Owner, Creator,
  InheritSecurity, Role, Permission, READ, WRITE, MANAGE,
  PERMISSIONS_ATTR,
)


#: list of legacy supported permissions when not using :class:`Permission`
#: instance
PERMISSIONS = frozenset(['read', 'write', 'manage'])

__all__ = ['security', 'SecurityError', 'SecurityService',
           'InheritSecurity', 'SecurityAudit']

#: default security matrix
DEFAULT_PERMISSION_ROLE = dict()
prm = DEFAULT_PERMISSION_ROLE
prm[MANAGE] = frozenset((Admin, Manager,))
prm[WRITE] = frozenset((Admin, Manager, Writer,))
prm[READ] = frozenset((Admin, Manager, Writer, Reader,))
del prm


class SecurityError(Exception):
  pass


[docs]class SecurityServiceState(ServiceState): """ """ use_cache = True #: True if security has changed needs_db_flush = False
def require_flush(fun): """ Decorator for methods that need to query security. It ensures all security related operations are flushed to DB, but avoids unneeded flushes. """ @wraps(fun) def ensure_flushed(service, *args, **kwargs): if service.app_state.needs_db_flush: session = current_app.db.session() if (not session._flushing and any(isinstance(m, (RoleAssignment, SecurityAudit)) for models in (session.new, session.dirty, session.deleted) for m in models)): session.flush() service.app_state.needs_db_flush = False return fun(service, *args, **kwargs) return ensure_flushed def query_pa_no_flush(session, permission, role, obj): """ query for a :class:`PermissionAssignment` using `session` without any `flush()`. It works by looking in session `new`, `dirty` and `deleted`, and issuing a query with no autoflush. .. note:: This function is used by `add_permission` and `delete_permission` to allow to add/remove the same assignment twice without issuing any flush. Since :class:`Entity` creates its initial permissions in during :sqlalchemy:`sqlalchemy.orm.events.SessionEvents.after_attach`, it might be problematic to issue a flush when entity is not yet ready to be flushed (missing required attributes for example). """ to_visit = [session.deleted, session.dirty, session.new] with session.no_autoflush: # no_autoflush is required to visit PERMISSIONS_ATTR without emitting a # flush() if obj: to_visit.append(getattr(obj, PERMISSIONS_ATTR)) permissions = ( p for p in chain(*to_visit) if isinstance(p, PermissionAssignment) ) for instance in permissions: if (instance.permission == permission and instance.role == role and instance.object == obj): return instance # last chance: perform a filtered query. If obj is not None, sometimes # getattr(obj, PERMISSIONS_ATTR) has objects not present in session not in # this query (maybe in a parent session transaction `new`?). This happens # when return session.query(PermissionAssignment)\ .filter(PermissionAssignment.permission == permission, PermissionAssignment.role == role, PermissionAssignment.object == obj)\ .first() # noinspection PyComparisonWithNone class SecurityService(Service): """ """ name = 'security' AppStateClass = SecurityServiceState def init_app(self, app): Service.init_app(self, app) state = app.extensions[self.name] state.use_cache = True def _needs_flush(self): """ Mark next security queries needs DB flush to have up to date information """ self.app_state.needs_db_flush = True def clear(self): pass def _current_user_manager(self): """Returns the current user, or SYSTEM user. """ try: return g.user except: return User.query.get(0) # security log @require_flush def entries_for(self, obj, limit=20): assert isinstance(obj, Entity) return SecurityAudit.query.filter(SecurityAudit.object == obj)\ .order_by(SecurityAudit.happened_at.desc())\ .limit(limit) # inheritance def set_inherit_security(self, obj, inherit_security): """ """ assert isinstance(obj, InheritSecurity) assert isinstance(obj, Entity) obj.inherit_security = inherit_security db.session.add(obj) manager = self._current_user_manager() op = (SecurityAudit.SET_INHERIT if inherit_security else SecurityAudit.UNSET_INHERIT) audit = SecurityAudit(manager=manager, op=op, object=obj, object_id=obj.id, object_type=obj.entity_type, object_name=obj.name) db.session.add(audit) self._needs_flush() # # Roles-related API. # @require_flush def get_roles(self, principal, object=None, no_group_roles=False): """ Gets all the roles attached to given `principal`, on a given `object`. :param principal: a :class:`User` or :class:`Group` :param object: an :class:`Entity` :param no_group_roles: If `True`, return only direct roles, not roles acquired through group membership. """ assert principal if hasattr(principal, 'is_anonymous') and principal.is_anonymous(): return [AnonymousRole] q = db.session.query(RoleAssignment.role) if isinstance(principal, Group): filter_principal = RoleAssignment.group == principal else: filter_principal = RoleAssignment.user == principal if not no_group_roles: groups = [g.id for g in principal.groups] if groups: filter_principal |= RoleAssignment.group_id.in_(groups) q = q.filter(filter_principal) if object is not None: assert isinstance(object, Entity) q = q.filter(RoleAssignment.object == object) roles = {i[0] for i in q.all()} if object is not None: for attr, role in (('creator', Creator), ('owner', Owner)): if getattr(object, attr) == principal: roles.add(role) return list(roles) @require_flush def get_principals(self, role, anonymous=True, users=True, groups=True, object=None): """ Return all users which are assigned given role """ if not isinstance(role, Role): role = Role(role) assert role assert (users or groups) q = RoleAssignment.query.filter_by(role=role) if not anonymous: q = q.filter(RoleAssignment.anonymous == False) if not users: q = q.filter(RoleAssignment.user == None) elif not groups: q = q.filter(RoleAssignment.group == None) q = q.filter(RoleAssignment.object == object) principals = {(ra.user or ra.group) for ra in q.all()} if object is not None and role in (Creator, Owner): p = object.creator if role == Creator else object.owner if p: principals.add(p) return list(principals) @require_flush def _all_roles(self, principal): q = db.session.query(RoleAssignment.object_id, RoleAssignment.role)\ .outerjoin(Entity)\ .add_columns(Entity._entity_type) if isinstance(principal, User): filter_cond = (RoleAssignment.user == principal) if len(principal.groups) > 0: filter_cond |= (RoleAssignment.group in principal.groups) q = q.filter(filter_cond) else: q = q.filter(RoleAssignment.group == principal) results = q.all() all_roles = {} for object_id, role, object_type in results: if object_id is None: object_key = None else: object_key = u'{}:{}'.format(object_type, object_id) all_roles.setdefault(object_key, set()).add(role) return all_roles def _role_cache(self, principal): if not self._has_role_cache(principal): # FIXME: should call _fill_role_cache? principal.__roles_cache__ = {} return principal.__roles_cache__ def _has_role_cache(self, principal): return hasattr(principal, "__roles_cache__") def _set_role_cache(self, principal, cache): principal.__roles_cache__ = cache def _fill_role_cache(self, principal, overwrite=False): """ Fill role cache for `principal` (User or Group), in order to avoid too many queries when checking role access with 'has_role' Return role_cache of `principal` """ if not self.app_state.use_cache: return None if not self._has_role_cache(principal) or overwrite: self._set_role_cache(principal, self._all_roles(principal)) return self._role_cache(principal) @require_flush def _fill_role_cache_batch(self, principals, overwrite=False): """ Fill role cache for `principals` (Users and/or Groups), in order to avoid too many queries when checking role access with 'has_role' """ if not self.app_state.use_cache: return q = db.session.query(RoleAssignment) users = set((u for u in principals if isinstance(u, User))) groups = set((g for g in principals if isinstance(g, Group))) groups |= set((g for u in users for g in u.groups)) if not overwrite: users = set((u for u in users if not self._has_role_cache(u))) groups = set((g for g in groups if not self._has_role_cache(g))) if not (users or groups): return # ensure principals processed here will have role cache. Thus users or # groups without any role will have an empty role cache, to avoid unneeded # individual DB query when calling self._fill_role_cache(p). map(lambda p: self._set_role_cache(p, {}), (p for p in chain(users, groups))) filter_cond = [] if users: filter_cond.append(RoleAssignment.user_id.in_((u.id for u in users))) if groups: filter_cond.append(RoleAssignment.group_id.in_((g.id for g in groups))) q = q.filter(sql.or_(*filter_cond)) ra_users = {} ra_groups = {} for ra in q.all(): if ra.user: all_roles = ra_users.setdefault(ra.user, {}) else: all_roles = ra_groups.setdefault(ra.group, {}) object_key = ( u'{}:{:d}'.format(ra.object.entity_type, ra.object_id) if ra.object is not None else None ) all_roles.setdefault(object_key, set()).add(ra.role) for group, all_roles in ra_groups.iteritems(): self._set_role_cache(group, all_roles) for user, all_roles in ra_users.iteritems(): for gr in user.groups: group_roles = self._fill_role_cache(gr) for object_key, roles in group_roles.iteritems(): obj_roles = all_roles.setdefault(object_key, set()) obj_roles |= roles self._set_role_cache(user, all_roles) def has_role(self, principal, role, object=None): """ True if `principal` has `role` (either globally, if `object` is None, or on the specific `object`). :param:role: can be a list or tuple of strings or a :class:`Role` instance `object` can be an :class:`Entity`, a string, or `None`. Note: we're using a cache for efficiency here. TODO: check that we're not over-caching. Note2: caching could also be moved upfront to when the user is loaded. """ if not principal: return False principal = noproxy(principal) if not self.running: return True if isinstance(role, (Role, basestring)): role = (role,) # admin & manager always have role valid_roles = frozenset((Admin, Manager) + tuple(role)) if AnonymousRole in valid_roles: # everybody has the role 'Anonymous' return True if (principal is AnonymousRole or (hasattr(principal, 'is_anonymous') and principal.is_anonymous())): # anonymous user, and anonymous role isn't in valid_roles return False # root always have any role if isinstance(principal, User) and principal.id == 0: return True if object: assert isinstance(object, Entity) object_key = u"{}:{}".format(object.object_type, unicode(object.id)) if Creator in role: if object.creator == principal: return True if Owner in role: if object.owner == principal: return True else: object_key = None if self.app_state.use_cache: cache = self._fill_role_cache(principal) if Admin in cache.get(None, ()): # user is a global admin return True if object_key in cache: roles = cache[object_key] return len(valid_roles & roles) > 0 return False all_roles = self._all_roles(principal) if Admin in all_roles.get(None, ()): # user is a global admin return True roles = all_roles.get(object_key, set()) return len(valid_roles & roles) > 0 def grant_role(self, principal, role, obj=None): """ Grants `role` to `user` (either globally, if `obj` is None, or on the specific `obj`). """ assert principal principal = noproxy(principal) manager = self._current_user_manager() session = object_session(obj) if obj is not None else db.session args = dict(role=role, object=obj, anonymous=False, user=None, group=None) if (principal is AnonymousRole or (hasattr(principal, 'is_anonymous') and principal.is_anonymous())): args['anonymous'] = True elif isinstance(principal, User): args['user'] = principal else: args['group'] = principal if len(RoleAssignment.query.filter_by(**args).limit(1).all()) > 0: # role already granted, nothing to do return # same as above but in current, not yet flushed objects in session. We # cannot call flush() in grant_role() since this method may be called a # great number of times in the same transaction, and sqlalchemy limits to # 100 flushes before triggering a warning for ra in (o for models in (session.new, session.dirty) for o in models if isinstance(o, RoleAssignment)): if all(getattr(ra, attr) == val for attr, val in args.items()): return ra = RoleAssignment(**args) session.add(ra) audit = SecurityAudit(manager=manager, op=SecurityAudit.GRANT, **args) if obj is not None: audit.object_id = obj.id audit.object_type = obj.entity_type object_name = u'' for attr_name in ('name', 'path', '__path_before_delete'): if hasattr(obj, attr_name): object_name = getattr(obj, attr_name) audit.object_name = object_name session.add(audit) self._needs_flush() if hasattr(principal, "__roles_cache__"): del principal.__roles_cache__ def ungrant_role(self, principal, role, object=None): """ Ungrants `role` to `user` (either globally, if `object` is None, or on the specific `object`). """ assert principal principal = noproxy(principal) session = object_session(object) if object is not None else db.session manager = self._current_user_manager() args = dict(role=role, object=object, anonymous=False, user=None, group=None) q = session.query(RoleAssignment) q = q.filter(RoleAssignment.role == role, RoleAssignment.object == object) if (principal is AnonymousRole or (hasattr(principal, 'is_anonymous') and principal.is_anonymous())): args['anonymous'] = True q.filter(RoleAssignment.anonymous == False, RoleAssignment.user == None, RoleAssignment.group == None) elif isinstance(principal, User): args['user'] = principal q = q.filter(RoleAssignment.user == principal) else: args['group'] = principal q = q.filter(RoleAssignment.group == principal) ra = q.one() session.delete(ra) audit = SecurityAudit(manager=manager, op=SecurityAudit.REVOKE, **args) session.add(audit) self._needs_flush() if hasattr(principal, "__roles_cache__"): del principal.__roles_cache__ @require_flush def get_role_assignements(self, object): session = object_session(object) if object is not None else db.session if not session: session = db.session() q = session.query(RoleAssignment) q = q.filter(RoleAssignment.object == object)\ .options(subqueryload('user.groups')) role_assignments = q.all() results = [] for ra in role_assignments: principal = None if ra.anonymous: principal = AnonymousRole elif ra.user: principal = ra.user else: principal = ra.group results.append((principal, ra.role)) return results # # Permission API, currently hardcoded # def has_permission(self, user, permission, obj=None, inherit=False, roles=None): """ @param `obj`: target object to check permissions. @param `inherit`: check with permission inheritance. By default, check only local roles. @param `roles`: additional valid role or iterable of roles having `permission`. """ if not isinstance(permission, Permission): assert permission in PERMISSIONS permission = Permission(permission) user = noproxy(user) if not self.running: return True session = None if obj is not None: session = object_session(obj) if session is None: session = current_app.db.session() # root always have any permission if isinstance(user, User) and user.id == 0: return True # valid roles # 1: from database pa_filter = PermissionAssignment.object == None if obj is not None: pa_filter |= PermissionAssignment.object == obj pa_filter &= PermissionAssignment.permission == permission valid_roles = session.query(PermissionAssignment.role)\ .filter(pa_filter) valid_roles = { res[0] for res in valid_roles.yield_per(1000) } # complete with defaults valid_roles |= {Admin} # always have all permissions valid_roles |= DEFAULT_PERMISSION_ROLE.get(permission, set()) #FIXME: obj.__class__ could define default permisssion matrix too if roles is not None: if isinstance(roles, (Role, bytes, unicode)): roles = (roles,) for r in roles: valid_roles.add(Role(r)) #FIXME: query permission_role: global and on object if AnonymousRole in valid_roles: return True if Authenticated in valid_roles and not user.is_anonymous(): return True checked_objs = [None, obj] # first test global roles, then object local # roles if inherit and obj is not None: while (obj.inherit_security and obj.parent is not None): obj = obj.parent checked_objs.append(obj) principals = [user] + list(user.groups) self._fill_role_cache_batch(principals) return any((self.has_role(principal, valid_roles, item) for principal in principals for item in checked_objs)) def query_entity_with_permission(self, permission, user=None, Model=Entity): """ Filter a query on an :class:`Entity` or on of its subclasses. :param user: user to filter for. Default: `current_user`. :param permission: required :class:`Permission` :param Model: An :class:`Entity` based class. Useful when there is more than on Entity based object in query, or if an alias should be used. :returns: a `sqlalchemy.sql.exists()` expression. """ assert isinstance(permission, Permission) assert issubclass(Model, Entity) RA = sa.orm.aliased(RoleAssignment) PA = sa.orm.aliased(PermissionAssignment) id_column = Model.id creator = Model.creator owner = Model.owner if not self.running: return sa.sql.exists([1]) if user is None: user = current_user._get_current_object() # build clause: role EXISTS principal_filter = (RA.anonymous == True) if not user.is_anonymous(): principal_filter |= (RA.user == user) if user.groups: principal_filter |= RA.group_id.in_([g.id for g in user.groups]) # role_exists: find for each permission row if user has one of required # role, local or global role_exists = \ sa.sql.exists([1])\ .where( sa.sql.and_( (RA.role == PA.role), (RA.object_id == PA.object_id) | (RA.object_id == None), principal_filter)) permission_exists = \ sa.sql.exists([1])\ .where( sa.sql.and_( PA.permission == permission, PA.object_id == id_column, role_exists, )) # is_admin: self-explanatory. It search for local or global admin # role, but PermissionAssignment is not involved, thus it can match on # entities that don't have *any permission assignment*, whereas previous # expressions cannot. is_admin = \ sa.sql.exists([1])\ .where( sa.sql.and_( RA.role == Admin, (RA.object_id == id_column) | (RA.object_id == None), principal_filter, )) filter_expr = permission_exists | is_admin if user and not user.is_anonymous(): is_owner_or_creator = \ sa.sql.exists([1])\ .where( sa.sql.and_( PA.permission == permission, PA.object_id == id_column, sa.sql.or_((PA.role == Owner) & (owner == user), (PA.role == Creator) & (creator == user)), )) filter_expr |= is_owner_or_creator return filter_expr def get_permissions_assignments(self, obj=None, permission=None): """ :param permission: return only roles having this permission :returns: an dict where keys are `permissions` and values `roles` iterable. """ session = None if obj is not None: assert isinstance(obj, Entity) session = object_session(obj) if session is None: session = current_app.db.session() pa = session.query(PermissionAssignment.permission, PermissionAssignment.role)\ .filter(PermissionAssignment.object == obj) if permission: pa = pa.filter(PermissionAssignment.permission == permission) results = {} for permission, role in pa.yield_per(1000): results.setdefault(permission, set()).add(role) return results def add_permission(self, permission, role, obj=None): session = None if obj is not None: session = object_session(obj) if session is None: session = current_app.db.session() pa = query_pa_no_flush(session, permission, role, obj) if not pa: pa = PermissionAssignment(permission=permission, role=role, object=obj) # do it in any case: it could have been found in session.deleted session.add(pa) def delete_permission(self, permission, role, obj=None): session = None if obj is not None: session = object_session(obj) if session is None: session = current_app.db.session() pa = query_pa_no_flush(session, permission, role, obj) if pa: session.delete(pa) def filter_with_permission(self, user, permission, obj_list, inherit=False): user = noproxy(user) return [ obj for obj in obj_list if self.has_permission(user, permission, obj, inherit) ] # Instanciate the service security = SecurityService()