Source code for abilian.services.indexing.service

# coding=utf-8
"""
Indexing service for Abilian.

Adds Whoosh indexing capabilities to SQLAlchemy models.

Based on Flask-whooshalchemy by Karl Gyllstrom.

:copyright: (c) 2013-2014 by Abilian SAS
:copyright: (c) 2012 by Stefane Fermigier
:copyright: (c) 2012 by Karl Gyllstrom
:license: BSD (see LICENSE.txt)
"""
from __future__ import absolute_import, print_function, division

import os
import logging
from inspect import isclass

from flask import current_app, g, _app_ctx_stack, appcontext_pushed
from flask_login import current_user
from flask.globals import _lookup_app_object
from celery import shared_task
import sqlalchemy as sa
from sqlalchemy import event
from sqlalchemy.orm.session import Session
import whoosh.index
from whoosh.collectors import WrappingCollector
from whoosh.filedb.filestore import RamStorage, FileStorage
from whoosh.writing import AsyncWriter, CLEAR
from whoosh.qparser import DisMaxParser
from whoosh.analysis import StemmingAnalyzer, CharsetFilter
import whoosh.query as wq
from whoosh.support.charset import accent_map

from abilian.services import Service, ServiceState
from abilian.services.security import Role, Anonymous, Authenticated, security
from abilian.core import signals
from abilian.core.models.subjects import User, Group
from abilian.core.util import fqcn as base_fqcn, friendly_fqcn
from abilian.core.entities import Entity, Indexable
from abilian.core.extensions import db
from .adapter import SAAdapter
from .schema import DefaultSearchSchema, indexable_role

logger = logging.getLogger(__name__)
_TEXT_ANALYZER = StemmingAnalyzer() | CharsetFilter(accent_map)

_pending_indexation_attr = 'abilian_pending_indexation'

# as of whoosh 2.5.7, a method is missing on WrappingCollector. See
# https://bitbucket.org/mchaput/whoosh/issue/394/error-when-searching-with-groupedby-and
_PATCHED = False

if not _PATCHED:
  def wrapping_collector_remove(self, global_docnum):
    return self.child.remove(global_docnum)

  from abilian.core.logging import patch_logger
  patch_logger.info(WrappingCollector.remove)
  WrappingCollector.remove = wrapping_collector_remove
  _PATCHED = True
  del patch_logger
  del wrapping_collector_remove
# END PATCH

def url_for_hit(hit, default=u'#'):
  """
  Helper for building URLs from results
  """
  object_type = hit['object_type']
  object_id = int(hit['id'])
  try:
    return current_app.default_view.url_for(hit, object_type, object_id)
  except KeyError:
    return default
  except Exception:
    logger.error('Error building URL for search result', exc_info=True)
    return default


def fqcn(cls):
  if issubclass(cls, Entity):
    return cls.entity_type
  return base_fqcn(cls)


class IndexServiceState(ServiceState):
  whoosh_base = None
  indexes = None
  indexed_classes = None
  indexed_fqcn = None
  search_filter_funcs = None
  value_provider_funcs = None
  url_for_hit = None

  def __init__(self, *args, **kwargs):
    ServiceState.__init__(self, *args, **kwargs)
    self.indexes = {}
    self.indexed_classes = set()
    self.indexed_fqcn = set()
    self.search_filter_funcs = []
    self.value_provider_funcs = []
    self.url_for_hit = url_for_hit

  @property
  def to_update(self):
    return _lookup_app_object(_pending_indexation_attr)

  @to_update.setter
  def to_update(self, value):
    top = _app_ctx_stack.top
    if top is None:
      raise RuntimeError('working outside of application context')

    setattr(top, _pending_indexation_attr, value)


[docs]class WhooshIndexService(Service): """ Index documents using whoosh """ name = 'indexing' AppStateClass = IndexServiceState _listening = False def __init__(self, *args, **kwargs): Service.__init__(self, *args, **kwargs) self.adapters_cls = [SAAdapter] self.adapted = {} self.schemas = {'default': DefaultSearchSchema()}
[docs] def init_app(self, app): Service.init_app(self, app) state = app.extensions[self.name] whoosh_base = app.config.get("WHOOSH_BASE") if not whoosh_base: whoosh_base = "whoosh" # Default value if not os.path.isabs(whoosh_base): whoosh_base = os.path.join(app.instance_path, whoosh_base) state.whoosh_base = os.path.abspath(whoosh_base) if not self._listening: event.listen(Session, "after_flush", self.after_flush) event.listen(Session, "after_commit", self.after_commit) self._listening = True appcontext_pushed.connect(self.clear_update_queue, app) signals.register_js_api.connect(self._do_register_js_api)
def _do_register_js_api(self, sender): app = sender js_api = app.js_api.setdefault('search', {}) js_api['object_types'] = self.searchable_object_types()
[docs] def register_search_filter(self, func): """ Register a function that returns a query used for filtering search results. This query is And'ed with other filters. If no filtering should be performed the function must return None. """ self.app_state.search_filter_funcs.append(func)
[docs] def register_value_provider(self, func): """ Register a function that may alter content of indexable document. It is used in :meth:`get_document` and called after adapter has built document. The function must accept (document, obj) as arguments, and return the new document object. """ self.app_state.value_provider_funcs.append(func)
[docs] def clear_update_queue(self, app=None): self.app_state.to_update = []
[docs] def start(self): Service.start(self) self.register_classes() self.init_indexes() self.clear_update_queue()
[docs] def init_indexes(self): """ Create indexes for schemas. """ state = self.app_state for name, schema in self.schemas.iteritems(): if current_app.testing: storage = TestingStorage() else: index_path = os.path.join(state.whoosh_base, name) if not os.path.exists(index_path): os.makedirs(index_path) storage = FileStorage(index_path) FileIndex = whoosh.index.FileIndex if not storage.index_exists(name): FileIndex = whoosh.index.FileIndex.create index = FileIndex(storage, schema, name) state.indexes[name] = index
[docs] def clear(self): """ Remove all content from indexes, and unregister all classes. After clear() the service is stopped. It must be started again to create new indexes and register classes. """ logger.info('Resetting indexes') state = self.app_state for name, idx in state.indexes.iteritems(): writer = AsyncWriter(idx) writer.commit(merge=True, optimize=True, mergetype=CLEAR) state.indexes = {} state.indexed_classes = set() state.indexed_fqcn = set() self.clear_update_queue() if self.running: self.stop()
[docs] def index(self, name='default'): return self.app_state.indexes[name]
@property def default_search_fields(self): """ Return default field names and boosts to be used for searching. Can be configured with `SEARCH_DEFAULT_BOOSTS` """ config = current_app.config.get('SEARCH_DEFAULT_BOOSTS') if not config: config = dict( name=1.5, name_prefix=1.3, description=1.3, text=1.0,) return config
[docs] def searchable_object_types(self): """ List of (object_types, friendly name) present in the index. """ try: idx = self.index() except KeyError: # index does not exists: service never started, may happens during tests return [] with idx.reader() as r: indexed = sorted(set(r.field_terms('object_type'))) app_indexed = self.app_state.indexed_fqcn return [(name, friendly_fqcn(name)) for name in indexed if name in app_indexed]
[docs] def search(self, q, index='default', fields=None, Models=(), object_types=(), prefix=True, facet_by_type=None, **search_args): """ Interface to search indexes. :param q: unparsed search string. :param index: name of index to use for search. :param fields: optionnal mapping of field names -> boost factor? :param Models: list of Model classes to limit search on. :param object_types: same as `Models`, but directly the model string. :param prefix: enable or disable search by prefix :param facet_by_type: if set, returns a dict of object_type: results with a max of `limit` matches for each type. :param search_args: any valid parameter for :meth:`whoosh.searching.Search.search`. This includes `limit`, `groupedby` and `sortedby` """ index = self.app_state.indexes[index] if not fields: fields = self.default_search_fields valid_fields = set(f for f in index.schema.names(check_names=fields) if prefix or not f.endswith('_prefix')) for invalid in set(fields) - valid_fields: del fields[invalid] parser = DisMaxParser(fields, index.schema) query = parser.parse(q) filters = search_args.setdefault('filter', None) filters = [filters] if filters is not None else [] del search_args['filter'] if not hasattr(g, 'is_manager') or not g.is_manager: # security access filter user = current_user roles = {indexable_role(user)} if not user.is_anonymous(): roles.add(indexable_role(Anonymous)) roles.add(indexable_role(Authenticated)) roles |= set(indexable_role(r) for r in security.get_roles(user)) filter_q = wq.Or([wq.Term('allowed_roles_and_users', role) for role in roles]) filters.append(filter_q) object_types = set(object_types) for m in Models: object_type = m.entity_type if not object_type: continue object_types.add(object_type) if object_types: object_types &= self.app_state.indexed_fqcn else: # ensure we don't show content types previously indexed but not yet # cleaned from index object_types = self.app_state.indexed_fqcn # limit object_type filter_q = wq.Or([wq.Term('object_type', t) for t in object_types]) filters.append(filter_q) for func in self.app_state.search_filter_funcs: filter_q = func() if filter_q is not None: filters.append(filter_q) if filters: filter_q = wq.And(filters) if len(filters) > 1 else filters[0] #search_args['filter'] = filter_q query = filter_q & query if facet_by_type: if not object_types: object_types = [t[0] for t in self.searchable_object_types()] # limit number of documents to score, per object type collapse_limit = 5 search_args['groupedby'] = 'object_type' search_args['collapse'] = 'object_type' search_args['collapse_limit'] = collapse_limit search_args['limit'] = (search_args['collapse_limit'] * max(len(object_types), 1)) with index.searcher(closereader=False) as searcher: # 'closereader' is needed, else results cannot by used outside 'with' # statement results = searcher.search(query, **search_args) if facet_by_type: positions = { doc_id: pos for pos, doc_id in enumerate(i[1] for i in results.top_n)} sr = results results = {} for typename, doc_ids in sr.groups('object_type').items(): results[typename] = [sr[positions[oid]] for oid in doc_ids[:collapse_limit]] return results
[docs] def search_for_class(self, query, cls, index='default', **search_args): return self.search(query, Models=(fqcn(cls),), index=index, **search_args)
[docs] def register_classes(self): state = self.app_state classes = (cls for cls in db.Model._decl_class_registry.values() if isclass(cls) and issubclass(cls, Indexable) and cls.__indexable__) for cls in classes: if not cls in state.indexed_classes: self.register_class(cls, app_state=state)
[docs] def register_class(self, cls, app_state=None): """ Registers a model class """ state = app_state if app_state is not None else self.app_state for Adapter in self.adapters_cls: if Adapter.can_adapt(cls): break else: return cls_fqcn = fqcn(cls) self.adapted[cls_fqcn] = Adapter(cls, self.schemas['default']) state.indexed_classes.add(cls) state.indexed_fqcn.add(cls_fqcn)
[docs] def after_flush(self, session, flush_context): if not self.running or session is not db.session(): return to_update = self.app_state.to_update session_objs = ( ('new', session.new), ('deleted', session.deleted), ('changed', session.dirty), ) for key, objs in session_objs: for obj in objs: model_name = fqcn(obj.__class__) adapter = self.adapted.get(model_name) if adapter is None or not adapter.indexable: continue to_update.append((key, obj))
[docs] def after_commit(self, session): """ Any db updates go through here. We check if any of these models have ``__searchable__`` fields, indicating they need to be indexed. With these we update the whoosh index for the model. If no index exists, it will be created here; this could impose a penalty on the initial commit of a model. """ if (not self.running or session.transaction.nested # inside a sub-transaction: # not yet written in DB or session is not db.session()): # note: we have not tested too far if session is enclosed in a transaction # at connection level. For now it's not a standard use case, it would most # likely happens during tests (which don't do that for now) return primary_field = 'id' state = self.app_state items = [] for op, obj in state.to_update: model_name = fqcn(obj.__class__) if model_name not in self.adapted or \ not self.adapted[model_name].indexable: # safeguard continue # safeguard against DetachedInstanceError if sa.orm.object_session(obj) is not None: items.append((op, model_name, getattr(obj, primary_field), {})) if items: index_update.apply_async(kwargs=dict(index='default', items=items)) self.clear_update_queue()
[docs] def get_document(self, obj, adapter=None): """ """ if adapter is None: class_name = fqcn(obj.__class__) adapter = self.adapted.get(class_name) if adapter is None or not adapter.indexable: return None document = adapter.get_document(obj) for k, v in document.items(): if v is None: del document[k] continue if isinstance(v, (User, Group, Role)): document[k] = indexable_role(v) if not document.get('allowed_roles_and_users'): # no data for security: assume anybody can access the document document['allowed_roles_and_users'] = indexable_role(Anonymous) for func in self.app_state.value_provider_funcs: res = func(document, obj) if res is not None: document = res return document
[docs] def index_objects(self, objects, index='default'): """ Bulk index a list of objects. """ if not objects: return index_name = index index = self.app_state.indexes[index_name] indexed = set() with index.writer() as writer: for obj in objects: document = self.get_document(obj) if document is None: continue object_key = document['object_key'] if object_key in indexed: continue writer.delete_by_term('object_key', object_key) writer.add_document(**document) indexed.add(object_key)
service = WhooshIndexService() @shared_task(ignore_result=True) def index_update(index, items): """ :param:index: index name :param:items: list of (operation, full class name, primary key, data) tuples. """ index_name = index index = service.app_state.indexes[index_name] adapted = service.adapted session = Session(bind=db.session.get_bind(None, None), autocommit=True) if not getattr(session, '_model_changes', None): # Flask-Sqlalchemy up to 1.0 needs this setattr(session, '_model_changes', {}) updated = set() writer = AsyncWriter(index) try: for op, cls_name, pk, data in items: if pk is None: continue # always delete. Whoosh manual says that 'update' is actually delete + add # operation object_key = u'{}:{}'.format(cls_name, pk) writer.delete_by_term('object_key', object_key) adapter = adapted.get(cls_name) if not adapter: # FIXME: log to sentry? continue if object_key in updated: # don't add twice the same document in same transaction. The writer will # not delete previous records, ending in duplicate records for same # document. continue if op in ("new", "changed"): with session.begin(): obj = adapter.retrieve(pk, _session=session, **data) if obj is None: # deleted after task queued, but before task run continue document = service.get_document(obj, adapter) writer.add_document(**document) updated.add(object_key) except: writer.cancel() raise session.close() writer.commit() try: # async thread: wait for its termination writer.join() except RuntimeError: # happens when actual writer was alraedy available: asyncwriter needn't to # start a thread pass class TestingStorage(RamStorage): """ RamStorage whoses temp_storage method returns another TestingStorage instead of a FileStorage. Reason is that FileStorage.temp_storage() creates temp file in /tmp/index_name.tmp/, which is subject to race conditions when many tests are ran in parallel, including different abilian-based packages. """ def temp_storage(self, name=None): return TestingStorage()