Source code for abilian.web.views.object

# coding=utf-8
Class based views
from __future__ import absolute_import, print_function, division
import logging

import sqlalchemy as sa
from flask import (
    g, request, render_template, redirect, url_for, current_app,
    flash, abort
from werkzeug.exceptions import NotFound, BadRequest

from abilian.i18n import _, _l
from import READ, WRITE
from abilian.core.signals import activity
from abilian.core.entities import ValidationError

from .. import nav, csrf, forms
from ..action import ButtonAction, Endpoint, actions
from .base import View, JSONView

logger = logging.getLogger(__name__)

[docs]class BaseObjectView(View): """ Base class common to all database objects views """ #: form title title = None #: Model class Model = None #: primary key name to look for in url arguments pk = 'object_id' #: object instance for this view obj = None #: template to render template = None #: default templates inherit from "base_template". This allows to use generic #: templates with a custom base base_template = "base.html" def __init__(self, Model=None, pk=None, base_template=None, *args, **kwargs): View.__init__(self, *args, **kwargs) cls = self.__class__ = pk if pk is not None else self.Model = Model if Model is not None else cls.Model self.base_template = (base_template if base_template is not None else cls.base_template)
[docs] def prepare_args(self, args, kwargs): args, kwargs = self.init_object(args, kwargs) if self.obj is None: raise NotFound() return args, kwargs
[docs] def breadcrumb(self): """ Return :class:`..nav.BreadcrumbItem` instance for this object. This method may return a list of BreadcrumbItem instances. Return `None` if nothing. """ return None
[docs] def init_object(self, args, kwargs): """ This method is reponsible for setting :attr:`obj`. It is called during :meth:`prepare_args`. """ object_id = kwargs.pop(, None) if object_id is not None: self.obj = self.Model.query.get(object_id) actions.context['object'] = self.obj return args, kwargs
[docs] def get(self, *args, **kwargs): bc = self.breadcrumb() if bc is not None: bc = [bc] if isinstance(bc, nav.BreadcrumbItem) else list(bc) assert all(isinstance(b, nav.BreadcrumbItem) for b in bc) g.breadcrumb.extend(bc) kwargs = {'base_template': self.base_template} kwargs.update(self.template_kwargs) # forbid override "view" kwargs['view'] = self return render_template(self.template, **kwargs)
@property def template_kwargs(self): """ Template render arguments. You can override `base_template` for instance. Only `view` cannot be overriden. """ return {}
[docs]class ObjectView(BaseObjectView): """ View objects """ #: html template template = 'default/object_view.html' #: View form class. Form object used to show objects fields Form = None #: required permission. Must be an instance of #: :class:`` permission = READ #: form instance for this view form = None def __init__(self, Model=None, pk=None, Form=None, template=None, *args, **kwargs): super(ObjectView, self).__init__(Model, pk, *args, **kwargs) cls = self.__class__ self.Form = Form if Form is not None else cls.Form self.template = template if template is not None else cls.template
[docs] def prepare_args(self, args, kwargs): """ :attr:`form` is initialized here. See also :meth:`View.prepare_args`. """ args, kwargs = super(ObjectView, self).prepare_args(args, kwargs) self.form = self.Form(**self.get_form_kwargs()) return args, kwargs
[docs] def get_form_kwargs(self): kw = dict(obj=self.obj) if issubclass(self.Form, forms.Form) and self.permission: kw['permission'] = self.permission return kw
[docs] def index_url(self): return url_for('.index')
[docs] def redirect_to_index(self): return redirect(self.index_url())
@property def template_kwargs(self): """ provides :attr:`form` to templates """ kw = super(ObjectView, self).template_kwargs kw['form'] = self.form return kw
CANCEL_BUTTON = ButtonAction( 'form', 'cancel', title=_l(u'Cancel'), btn_class='default cancel' # .cancel: if jquery.validate is used it will ) # properly skip validation EDIT_BUTTON = ButtonAction('form', 'edit', btn_class='primary', title=_l(u'Save'))
[docs]class ObjectEdit(ObjectView): """ Edit objects """ template = 'default/object_edit.html' decorators = (csrf.support_graceful_failure,) permission = WRITE #: :class:ButtonAction instance to show on form _buttons = () #: submitted form data data = None #: action name from form data action = None #: button clicked, corresponding to :attr:`action`. button = None #: verb used to describe activity activity_verb = 'update' #: UI flash message _message_success = _l(u"Entity successfully edited") def __init__(self, Model=None, pk=None, Form=None, template=None, view_endpoint=None, message_success=None, *args, **kwargs): ObjectView.__init__(self, Model, pk, Form, template=template, *args, **kwargs) self.view_endpoint = (view_endpoint if view_endpoint is not None else '.{}_view'.format(self.Model.__name__)) if message_success: self._message_success = message_success
[docs] def post(self, *args, **kwargs): # conservative: no action submitted -> cancel action ='__action', u'cancel') if action == u'cancel': return self.cancel() return self.handle_action(action)
[docs] def put(self): return
[docs] def prepare_args(self, args, kwargs): args, kwargs = super(ObjectEdit, self).prepare_args(args, kwargs) self._buttons = self.get_form_buttons(*args, **kwargs) = request.form return args, kwargs
[docs] def get_form_buttons(self, *args, **kwargs): return [EDIT_BUTTON, CANCEL_BUTTON]
@property def buttons(self): return (button for button in self._buttons if button.available(actions.context))
[docs] def view_url(self): kw = { } return url_for(self.view_endpoint, **kw)
[docs] def redirect_to_view(self): if self.button: url = self.button.url(actions.context) if url: return redirect(url) return redirect(self.view_url())
[docs] def message_success(self): return unicode(self._message_success) # actions
[docs] def handle_action(self, action): for button in self._buttons: if action == if not button.available(dict(view=self)): raise ValueError('Action "{}" not available' ''.format(action.encode('utf-8'))) break else: raise ValueError('Unknown action: "{}"'.format(action.encode('utf-8'))) self.action = action self.button = button return getattr(self, action)()
[docs] def cancel(self): return self.redirect_to_view()
[docs] def edit(self): if self.validate(): return self.form_valid() else: if request.csrf_failed: errors = self.form.errors csrf_failed = errors.pop('csrf_token', False) if csrf_failed and not errors: # failed only because of invalid/expired csrf, no error on form return self.form_csrf_invalid() resp = self.form_invalid() if resp: return resp flash(_(u"Please fix the error(s) below"), "error") # if we end here then something wrong has happened: show form with error # messages return self.get()
[docs] def before_populate_obj(self): """ This method is called after form has been validated and before calling `form.populate_obj()`. Sometimes one may want to remove a field from the form because it's non-sense to store it on edited object, and use it in a specific manner, for example:: image = form.image del form.image store_image(image) """ pass
[docs] def after_populate_obj(self): """ Called after `self.obj` values have been updated, and `self.obj` attached to an ORM session. """ pass
[docs] def handle_commit_exception(self, exc): """ hook point to handle exception that may happen during commit. It is the responsability of this method to perform a rollback if it is required for handling `exc`. If the method does not handle `exc` if should do nothing and return None. :returns: * a valid :class:`Response` if exception is handled. * `None` if exception is not handled. Default handling happens. """ return None
[docs] def commit_success(self): """ Called after object has been successfully saved to database """
[docs] def validate(self): return self.form.validate()
[docs] def form_valid(self): """ Save object. Called when form is validated. """ session = current_app.db.session() self.before_populate_obj() self.form.populate_obj(self.obj) session.add(self.obj) self.after_populate_obj() try: session.flush() activity.send(self, actor=g.user, verb=self.activity_verb, object=self.obj, target=self.activity_target) session.commit() except ValidationError as e: rv = self.handle_commit_exception(e) if rv is not None: return rv session.rollback() flash(e.message, "error") return self.get() except sa.exc.IntegrityError as e: rv = self.handle_commit_exception(e) if rv is not None: return rv session.rollback() logger.error(e) flash(_(u"An entity with this name already exists in the database."), "error") return self.get() else: self.commit_success() flash(self.message_success(), "success") return self.redirect_to_view()
[docs] def form_invalid(self): """ When a form doesn't validate this method is called. It may return a :class:`Flask.Response` instance, to handle specific errors in custom screens. Else the edit form screen is returned with error(s) highlighted. This method is useful for detecting edition conflict using hidden fields and show a specific screen to help resolve the conflict. """ return None
[docs] def form_csrf_invalid(self): """ Called when a form doesn't validate *only* because of csrf token expiration. This works only if form is an instance of :class:`flask_wtf.form.SecureForm`. Else default CSRF protection (before request) will take place. It must return a valid :class:`Flask.Response` instance. By default it returns to edit form screen with an informative message. """ current_app.extensions['csrf-handler'].flash_csrf_failed_message() return self.get()
@property def activity_target(self): """ Return `target` to use when creating activity. """ return None
CREATE_BUTTON = ButtonAction('form', 'create', btn_class='primary', title=_l(u'Create')) CHAIN_CREATE_BUTTON = ButtonAction( 'form', 'chain_create', btn_class='primary', title=_l(u'Create and add new'), endpoint=lambda ctx: Endpoint(request.endpoint, **request.view_args), condition=lambda ctx: getattr(ctx['view'], 'chain_create_allowed', False) )
[docs]class ObjectCreate(ObjectEdit): """ Create a new object """ activity_verb = 'post' _message_success = _l(u"Entity successfully added") #: set to `True` to show 'Save and add new' button chain_create_allowed = False def __init__(self, *args, **kwargs): chain_create_allowed = kwargs.pop('chain_create_allowed', None) if chain_create_allowed is not None: self.chain_create_allowed = bool(chain_create_allowed) ObjectEdit.__init__(self, *args, **kwargs)
[docs] def init_object(self, args, kwargs): self.obj = self.Model() return args, kwargs
[docs] def get_form_kwargs(self): kw = super(ObjectCreate, self).get_form_kwargs() if request.method == 'GET': # when GET allow form prefill instead of empty/current object data # FIXME: filter allowed parameters on given a field flags (could be # 'allow_from_get'?) kw['formdata'] = request.args return kw
[docs] def get_form_buttons(self, *args, **kwargs): return [CREATE_BUTTON, CHAIN_CREATE_BUTTON, CANCEL_BUTTON]
[docs] def breadcrumb(self): return nav.BreadcrumbItem(label=CREATE_BUTTON.title) # actions
[docs] def create(self): return self.edit()
chain_create = create
[docs] def cancel(self): return self.redirect_to_index()
DELETE_BUTTON = ButtonAction('form', 'delete', title=_l(u'Delete'))
[docs]class ObjectDelete(ObjectEdit): """ Delete object. Supports DELETE verb. """ methods = ['POST'] activity_verb = 'delete' _message_success = _l(u"Entity deleted") init_object = BaseObjectView.init_object
[docs] def get_form_buttons(self, *args, **kwargs): return [DELETE_BUTTON, CANCEL_BUTTON]
[docs] def delete(self): session = current_app.db.session() session.delete(self.obj) activity.send(self, actor=g.user, verb="delete", object=self.obj) session.commit() flash(self.message_success(), 'success') # FIXME: for DELETE verb response in case of success should be 200, 202 # (accepted) or 204 (no content) return self.redirect_to_index()
[docs]class JSONBaseSearch(JSONView): Model = None minimum_input_length = 2 def __init__(self, *args, **kwargs): Model = kwargs.pop('Model', self.Model) minimum_input_length = kwargs.pop('minimum_input_length', self.minimum_input_length) super(JSONBaseSearch, self).__init__(*args, **kwargs) self.Model = Model self.minimum_input_length = minimum_input_length
[docs] def prepare_args(self, args, kwargs): args, kwargs = JSONView.prepare_args(self, args, kwargs) kwargs['q'] = kwargs.get("q", u'').replace(u"%", u" ").lower() return args, kwargs
[docs] def data(self, q, *args, **kwargs): if self.minimum_input_length and len(q) < self.minimum_input_length: raise BadRequest( 'Minimum query length is {:d}'.format(self.minimum_input_length), ) results = [] for obj in self.get_results(q, **kwargs): results.append(self.get_item(obj)) return dict(results=results)
[docs] def get_results(self, q, *args, **kwargs): raise NotImplementedError
[docs] def get_item(self, obj): """ Return a result item :param obj: Instance object :returns: a dictionnary with at least `id` and `text` values """ raise NotImplementedError
[docs]class JSONModelSearch(JSONBaseSearch): """ Base class for json sqlalchemy model search, as used by select2 widgets for example """
[docs] def get_results(self, q, *args, **kwargs): query = self.Model.query query = self.options(query) query = self.filter(query, q, **kwargs) query = self.order_by(query) if not q and not self.minimum_input_length: query = query.limit(50) return query.all()
[docs] def options(self, query): return query.options(sa.orm.noload('*'))
[docs] def filter(self, query, q, **kwargs): if not q: return query return query.filter(sa.func.lower( + "%"))
[docs] def order_by(self, query): return query.order_by(
[docs] def get_label(self, obj): return
[docs] def get_item(self, obj): """ Return a result item :param obj: Instance object :returns: a dictionnary with at least `id` and `text` values """ return dict(, text=self.get_label(obj),
[docs]class JSONWhooshSearch(JSONBaseSearch): """ Base class for JSON Whoosh search, as used by select2 widgets for example """
[docs] def get_results(self, q, *args, **kwargs): svc =['indexing'] search_kwargs = {'limit': 30, 'Models': (self.Model,)} results =, **search_kwargs) return results
[docs] def get_item(self, hit): """ Return a result item :param hit: Hit object from Whoosh :returns: a dictionnary with at least `id` and `text` values """ return dict(id=hit['id'], text=hit['name'], name=hit['name'])