Logo Search packages:      
Sourcecode: zeitgeist-extensions version File versions  Download package

fts.py

# -.- coding: utf-8 -.-

# Zeitgeist
#
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
# Copyright © 2010 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

#
# TODO
#
# - Delete events hook
# - ? Filter on StorageState
# - Throttle IO and CPU where possible

import os, sys
import time
import pickle
import dbus
import dbus.service
from xdg import BaseDirectory
from xdg.DesktopEntry import DesktopEntry, xdg_data_dirs
import logging
import subprocess
from xml.dom import minidom
import xapian
import os
from Queue import Queue, Empty
import threading
from urllib import quote as url_escape, unquote as url_unescape
import gobject, gio

from zeitgeist.datamodel import Symbol, StorageState, ResultType, TimeRange, NULL_EVENT, NEGATION_OPERATOR
from _zeitgeist.engine.datamodel import Event, Subject
from _zeitgeist.engine.extension import Extension
from _zeitgeist.engine import constants
from zeitgeist.datamodel import Interpretation, Manifestation

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger("zeitgeist.fts")

INDEX_FILE = os.path.join(constants.DATA_PATH, "fts.index")
FTS_DBUS_OBJECT_PATH = "/org/gnome/zeitgeist/index/activity"
FTS_DBUS_INTERFACE = "org.gnome.zeitgeist.Index"

FILTER_PREFIX_EVENT_INTERPRETATION = "ZGEI"
FILTER_PREFIX_EVENT_MANIFESTATION = "ZGEM"
FILTER_PREFIX_ACTOR = "ZGA"
FILTER_PREFIX_SUBJECT_INTERPRETATION = "ZGSI"
FILTER_PREFIX_SUBJECT_MANIFESTATION = "ZGSM"
FILTER_PREFIX_XDG_CATEGORY = "AC"

VALUE_EVENT_ID = 0
VALUE_TIMESTAMP = 1

# When sorting by of the COALESCING_RESULT_TYPES result types,
# we need to fetch some extra events from the Xapian index because
# the final result set will be coalesced on some property of the event
COALESCING_RESULT_TYPES = [ \
      ResultType.MostRecentSubjects,
      ResultType.LeastRecentSubjects,
      ResultType.MostPopularSubjects,
      ResultType.LeastPopularSubjects,
      ResultType.MostRecentActor,
      ResultType.LeastRecentActor,
      ResultType.MostPopularActor,
      ResultType.LeastPopularActor,
]

00083 class Deletion:
      """
      A marker class that marks an event id for deletion
      """
      def __init__ (self, event_id):
            self.event_id = event_id

00090 class SearchEngineExtension (Extension, dbus.service.Object):
      """
      Full text indexing and searching extension for Zeitgeist
      """
      PUBLIC_METHODS = []
      
      def __init__ (self, engine):
            Extension.__init__(self, engine)
            dbus.service.Object.__init__(self, dbus.SessionBus(),
                                         FTS_DBUS_OBJECT_PATH)
            self._indexer = Indexer(self.engine)
      
      def pre_insert_event(self, event, sender):
            # Fix when Zeitgeist 0.5.1 hits the street use post_insert_event() instead
            self._indexer.index_event (event)
            return event

      def post_delete_events (self, ids, sender):
            for _id in ids:
                  self._indexer.delete_event (_id)
                  
      @dbus.service.method(FTS_DBUS_INTERFACE,
                           in_signature="s(xx)a("+constants.SIG_EVENT+")uuu",
                           out_signature="a("+constants.SIG_EVENT+")u")
00114       def Search(self, query_string, time_range, filter_templates, offset, count, result_type):
            """
            DBus method to perform a full text search against the contents of the
            Zeitgeist log. Returns an array of events.
            """
            time_range = TimeRange(time_range[0], time_range[1])
            filter_templates = map(Event, filter_templates)
            events, hit_count = self._indexer.search(query_string, time_range,
                                                     filter_templates,
                                                     offset, count, result_type)
            return self._make_events_sendable (events), hit_count
      
      def _make_events_sendable(self, events):
            for event in events:
                  if event is not None:
                        event._make_dbus_sendable()
            return [NULL_EVENT if event is None else event for event in events]

00132 class Indexer:
      """
      Abstraction of the FT indexer and search engine
      """
      
      QUERY_PARSER_FLAGS = xapian.QueryParser.FLAG_PHRASE |   \
                           xapian.QueryParser.FLAG_BOOLEAN |  \
                           xapian.QueryParser.FLAG_PURE_NOT |  \
                           xapian.QueryParser.FLAG_LOVEHATE | \
                           xapian.QueryParser.FLAG_WILDCARD
      
      def __init__ (self, engine):
            self._engine = engine
      
            log.debug("Opening full text index: %s" % INDEX_FILE)
            self._index = xapian.WritableDatabase(INDEX_FILE, xapian.DB_CREATE_OR_OPEN)
            self._tokenizer = indexer = xapian.TermGenerator()
            self._query_parser = xapian.QueryParser()
            self._query_parser.set_database (self._index)
            self._query_parser.add_prefix("name", "N")
            self._query_parser.add_prefix("title", "N")
            self._query_parser.add_prefix("site", "S")
            self._query_parser.add_prefix("app", "A")
            self._query_parser.add_boolean_prefix("zgei", FILTER_PREFIX_EVENT_INTERPRETATION)
            self._query_parser.add_boolean_prefix("zgem", FILTER_PREFIX_EVENT_MANIFESTATION)
            self._query_parser.add_boolean_prefix("zga", FILTER_PREFIX_ACTOR)
            self._query_parser.add_boolean_prefix("zgsi", FILTER_PREFIX_SUBJECT_INTERPRETATION)
            self._query_parser.add_boolean_prefix("zgsm", FILTER_PREFIX_SUBJECT_MANIFESTATION)
            self._query_parser.add_prefix("category", FILTER_PREFIX_XDG_CATEGORY)
            self._query_parser.add_valuerangeprocessor(
                  xapian.NumberValueRangeProcessor(VALUE_EVENT_ID, "id", True))
            self._query_parser.add_valuerangeprocessor(
                  xapian.NumberValueRangeProcessor(VALUE_TIMESTAMP, "ms", False))
            self._query_parser.set_default_op(xapian.Query.OP_AND)
            self._enquire = xapian.Enquire(self._index)
            
            # Cache of parsed DesktopEntrys
            self._desktops = {}
            
            gobject.threads_init()
            self._may_run = True
            self._queue = Queue(0)
            self._worker = threading.Thread(target=self._worker_thread,
                                            name="IndexWorker")
            self._worker.daemon = True
            self._worker.start()
            
            if self._index.get_doccount() == 0:
                  # We must delay reindexing until after the engine is done setting up
                  log.info("Empty index detected. Doing full rebuild")
                  gobject.idle_add (self._reindex)
            
            # List term freqs
            #for term in self._index.allterms():
            #     print term.term, term.termfreq
      
00188       def _reindex (self):
            """
            Index everything in the ZG log
            """
            all_events = self._engine.find_events(TimeRange.always(),
                                                  [], StorageState.Any,
                                                  sys.maxint,
                                                  ResultType.MostRecentEvents)
            log.info("Preparing to index %s events" % len(all_events))
            for e in all_events : self._queue.put(e)
      
00199       def index_event (self, event):
            """
            This method schedules and event for indexing. It returns immediate and
            defers the actual work to a bottom half thread. This means that it
            will not block the main loop of the Zeitgeist daemon while indexing
            (which may be a heavy operation)
            """
            self._queue.put (event)
            return event
      
00209       def delete_event (self, event_id):
            """
            Remove an event from the index given its event id
            """
            self._queue.put (Deletion(event_id))
            return            
      
00216       def search (self, query_string, time_range=None, filters=None, offset=0, maxhits=10, result_type=100):
            """
            Do a full text search over the indexed corpus. The `result_type`
            parameter may be a zeitgeist.datamodel.ResultType or 100. In case it is
            100 the textual relevancy of the search engine will be used to sort the
            results. Result type 100 is the fastest (and default) mode.
            
            The filters argument should be a list of event templates.
            """
            # Expand event template filters if necessary
            if filters:
                  query_string = "(%s) AND (%s)" % (query_string, self._compile_event_filter_query (filters))
            
            # Expand time range value query
            if time_range and not time_range.is_always():
                  query_string = "(%s) AND (%s)" % (query_string, self._compile_time_range_filter_query (time_range))
            
            # If the result type coalesces the events we need to fetch some extra
            # events from the index to have a chance of actually holding 'maxhits'
            # unique events
            if result_type in COALESCING_RESULT_TYPES:
                  raw_maxhits = maxhits * 3
            else:
                  raw_maxhits = maxhits
            
            # When not sorting by relevance, we fetch the results from Xapian sorted,
            # by timestamp. That minimizes the skew we get from otherwise doing a
            # relevancy ranked xapaian query and then resorting with Zeitgeist. The
            # "skew" is that low-relevancy results may still have the highest timestamp
            if result_type == 100:
              self._enquire.set_sort_by_relevance()
            else:
              self._enquire.set_sort_by_value(VALUE_TIMESTAMP, True)
            
            # Allow wildcards
            query_start = time.time()
            query = self._query_parser.parse_query (query_string,
                                                    self.QUERY_PARSER_FLAGS)
            self._enquire.set_query (query)
            hits = self._enquire.get_mset (offset, raw_maxhits)
            hit_count = hits.get_matches_estimated()
            log.debug("Search '%s' gave %s hits in %sms" %
                      (query_string, hits.get_matches_estimated(), (time.time() - query_start)*1000))
            
            if result_type == 100:
                  event_ids = []
                  for m in hits:
                        event_id = int(xapian.sortable_unserialise(
                                                  m.document.get_value(VALUE_EVENT_ID)))                    
                        event_ids.append (event_id)
                  if event_ids:
                        return self._engine.get_events(ids=event_ids), hit_count
                  else:
                        return [], 0
            else:
                  templates = []
                  for m in hits:
                        event_id = int(xapian.sortable_unserialise(
                                                  m.document.get_value(VALUE_EVENT_ID)))
                        ev = Event()
                        ev[0][Event.Id] = str(event_id)
                        templates.append(ev)
            
                  if templates:
                        return self._engine._find_events(1, TimeRange.always(),
                                                         templates,
                                                         StorageState.Any,
                                                         maxhits,
                                                         result_type), hit_count
                  else:
                        return [], 0
      
      def _worker_thread (self):
            is_dirty = False
            while self._may_run:
                  # FIXME: Throttle IO and CPU
                  try:
                        # If we are dirty wait a while before we flush,
                        # or if we are clean wait indefinitely to avoid
                        # needless wakeups
                        if is_dirty:
                              event = self._queue.get(True, 0.5)
                        else:
                              event = self._queue.get(True)
                        
                        if isinstance (event, Deletion):
                              self._delete_event_real (event.event_id)
                        else:
                              self._index_event_real (event)
                        
                        is_dirty = True
                  except Empty:
                        if is_dirty:
                              # Write changes to disk
                              log.debug("Committing FTS index")
                              self._index.flush()
                              is_dirty = False
                        else:
                              log.debug("No changes to index. Sleeping")
      
00316       def _delete_event_real (self, event_id):
            """
            Look up the doc id given an event id and remove the xapian.Document
            for that doc id.
            Note: This is slow, but there's not much we can do about it
            """
            try:
                  _id = xapian.sortable_serialise(float(event_id))
                  query = xapian.Query(xapian.Query.OP_VALUE_RANGE, 
                                       VALUE_EVENT_ID, _id, _id)
                  
                  self._enquire.set_query (query)
                  hits = self._enquire.get_mset (0, 10)
                  
                  total = hits.get_matches_estimated()
                  if total > 1:
                        log.warning ("More than one event found with id '%s'" % event_id)
                  elif total <= 0:
                        log.debug ("No event for id '%s'" % event_id)
                        return
            
                  for m in hits:
                        log.debug("Deleting event '%s' with docid '%s'" %
                                  (event_id, m.docid))
                        self._index.delete_document(m.docid)
            except Exception, e:
                  log.error("Failed to delete event '%s': %s" % (event_id, e))
            
00344       def _split_uri (self, uri):
            """
            Returns a triple of (scheme, host, and path) extracted from `uri`
            """         
            i = uri.find(":")
            if i == -1 :
                  scheme =  ""
                  host = ""
                  path = uri
            else:
                  scheme = uri[:i]
                  host = ""
                  path = ""
            
            if uri[i+1] == "/" and uri[i+2] == "/":
                  j = uri.find("/", i+3)
                  if j == -1 :
                        host = uri[i+3:]
                  else:
                        host = uri[i+3:j]
                        path = uri[j:]
            else:
                  host = uri[i+1:]
            
            # Strip out URI query part
            i = path.find("?")
            if i != -1:
                  path = path[:i]
            
            return scheme, host, path
      
00375       def _get_desktop_entry (self, app_id):
            """
            Return a xdg.DesktopEntry.DesktopEntry `app_id` or None in case
            no file is found for the given desktop id
            """
            if app_id in self._desktops:
                  return self._desktops[app_id]
            
            for datadir in xdg_data_dirs:
                  path = os.path.join(datadir, "applications", app_id)
                  if os.path.exists(path):
                        try:
                              desktop = DesktopEntry(path)
                              self._desktops[app_id] = desktop
                              return desktop
                        except Exception, e:
                              log.warning("Unable to load %s: %s" % (path, e))
                              return None
            
            return None
      
00396       def _index_actor (self, actor):
            """
            Takes an actor as a path to a .desktop file or app:// uri
            and index the contents of the corresponding .desktop file
            into the document currently set for self._tokenizer.
            """
            if not actor : return
            
            # Get the path of the .desktop file and convert it to
            # an app id (eg. 'gedit.desktop')
            scheme, host, path = self._split_uri(url_unescape (actor))
            if not path:
                  path = host
            
            if not path :
                  log.debug("Unable to determine application id for %s" % actor)
                  return
            
            if path.startswith("/") :
                  path = os.path.basename(path)
            
            desktop = self._get_desktop_entry(path)
            if desktop:
                  if not desktop.getNoDisplay():
                        self._tokenizer.index_text(desktop.getName(), 5)
                        self._tokenizer.index_text(desktop.getName(), 5, "A")
                        self._tokenizer.index_text(desktop.getGenericName(), 5)
                        self._tokenizer.index_text(desktop.getGenericName(), 5, "A")
                        self._tokenizer.index_text(desktop.getComment(), 2)
                        self._tokenizer.index_text(desktop.getComment(), 2, "A")
                  
                        doc = self._tokenizer.get_document()
                        for cat in desktop.getCategories():
                              doc.add_term(FILTER_PREFIX_XDG_CATEGORY+cat.lower())
            else:
                  log.debug("Unable to look up app info for %s" % actor)
            
      
00434       def _index_uri (self, uri):
            """
            Index `uri` into the document currectly set on self._tokenizer
            """
            # File URIs and paths are indexed in one way, and all other,
            # usually web URIs, are indexed in another way because there may
            # be domain name etc. in there we want to rank differently
            scheme, host, path = self._split_uri (url_unescape (uri))
            if scheme == "file://" or not scheme:
                  path, name = os.path.split(path)
                  self._tokenizer.index_text(name, 5)
                  self._tokenizer.index_text(name, 5, "N")
                  
                  # Index parent names with descending weight
                  weight = 5
                  while path and name:
                        weight = weight / 1.5
                        path, name = os.path.split(path)
                        self._tokenizer.index_text(name, weight)
                  
            elif scheme == "mailto:":
                  tokens = host.split("@")
                  name = tokens[0]
                  self._tokenizer.index_text(name, 6)
                  if len(tokens) > 1:
                        self._tokenizer.index_text(" ".join[1:], 1)
            else:
                  path, name = os.path.split(path)
                  if name:
                        self._tokenizer.index_text(name, 5)
                        self._tokenizer.index_text(name, 5, "N")
                  if path:
                        self._tokenizer.index_text(path, 1)
                        self._tokenizer.index_text(path, 1, "N")
                  if host:
                        self._tokenizer.index_text(host, 2)
                        self._tokenizer.index_text(host, 2, "N")
                        self._tokenizer.index_text(host, 2, "S")
      
00473       def _index_text (self, text):
            """
            Index `text` as raw text data for the document currently
            set on self._tokenizer. The text is assumed to be a primary
            description of the subject, such as the basename of a file.
            
            Primary use is for subject.text
            """
            self._tokenizer.index_text(text, 5)
      
      def _index_contents (self, uri):
            # xmlindexer doesn't extract words for URIs only for file paths
            
            # FIXME: IONICE and NICE on xmlindexer
            
            path = uri.replace("file://", "")
            xmlindexer = subprocess.Popen(['xmlindexer', path],
                                          stdout=subprocess.PIPE)
            xml = xmlindexer.communicate()[0].strip()
            xmlindexer.wait()       
            
            dom = minidom.parseString(xml)
            text_nodes = dom.getElementsByTagName("text")
            lines = []
            if text_nodes:
                  for line in text_nodes[0].childNodes:
                        lines.append(line.data)
            
            if lines:
                        self._tokenizer.index_text (" ".join(lines))
            
      
00505       def _add_doc_filters (self, event, doc):
            """Adds the filtering rules to the doc. Filtering rules will
               not affect the relevancy ranking of the event/doc"""
            doc.add_term (FILTER_PREFIX_EVENT_INTERPRETATION+event.interpretation)
            doc.add_term (FILTER_PREFIX_EVENT_MANIFESTATION+event.manifestation)
            doc.add_term (FILTER_PREFIX_ACTOR+event.actor)
            
            for su in event.subjects:
                  doc.add_term (FILTER_PREFIX_SUBJECT_INTERPRETATION+su.interpretation)
                  doc.add_term (FILTER_PREFIX_SUBJECT_MANIFESTATION+su.manifestation)
      
      def _index_event_real (self, event):
            if not isinstance (event, Event):
                  log.error("Not an Event, found: %s" % type(event))
            if not event.id:
                  log.debug("Not indexing event. Event has no id")
                  return
            
            try:
                  doc = xapian.Document()
                  doc.add_value (VALUE_EVENT_ID,
                                 xapian.sortable_serialise(float(event.id)))
                  doc.add_value (VALUE_TIMESTAMP,
                                 xapian.sortable_serialise(float(event.timestamp)))
                  self._tokenizer.set_document (doc)
            
                  self._index_actor (event.actor)
            
                  for subject in event.subjects:
                        if not subject.uri : continue
                        log.debug("Indexing '%s'" % subject.uri)
                        self._index_uri (subject.uri)
                        self._index_text (subject.text)
                        if subject.uri.startswith ("application://"):
                          self._index_actor (subject.uri)
                        # FIXME: index origin into an origin: prefix
                        # FIXME: index uri into a uri: prefix
                        #self._index_contents (subject.uri)
                        # FIXME: Possibly index payloads when we have apriori knowledge
                  
                  self._add_doc_filters (event, doc)  
                        
                  self._index.add_document (doc)
            except Exception, e:
                  log.error("Error indexing event: %s" % e)

      def _expand_type (self, type_prefix, uri):
            is_negation = uri.startswith(NEGATION_OPERATOR)
            uri = uri[1:] if is_negation else uri
            children = Symbol.find_child_uris_extended(uri)
            children = [ "%s:%s" % (type_prefix, child) for child in children ]

            result = " OR ".join(children)

            return result if not is_negation else "NOT (%s)" % result

00561       def _compile_event_filter_query (self, events):
            """Takes a list of event templates and compiles a filter query
               based on their, interpretations, manifestations, and actor,
               for event and subjects.
               
               All fields within the same event will be ANDed and each template
               will be ORed with the others. Like elsewhere in Zeitgeist the
               type tree of the interpretations and manifestations will be expanded
               to match all child symbols as well
            """
            query = []
            for event in events:
                  if not isinstance(event, Event):
                        raise TypeError("Expected Event. Found %s" % type(event))
                  
                  tmpl = []
                  if event.interpretation :
                        tmpl.append(self._expand_type("zgei", event.interpretation))
                  if event.manifestation :
                        tmpl.append(self._expand_type("zgem", event.manifestation))
                  if event.actor : tmpl.append("zga:"+event.actor)
                  for su in event.subjects:
                        if su.interpretation :
                              tmpl.append(self._expand_type("zgsi", su.interpretation))
                        if su.manifestation :
                              tmpl.append(self._expand_type("zgsm", su.manifestation))
                  
                  tmpl = "(" + ") AND (".join(tmpl) + ")"
                  query.append(tmpl)
            
            return " OR ".join(query)
      
00593       def _compile_time_range_filter_query (self, time_range):
            """Takes a TimeRange and compiles a range query for it"""
            
            if not isinstance(time_range, TimeRange):
                  raise TypeError("Expected TimeRange, but found %s" % type(time_range))
            
            return "%s..%sms" % (time_range.begin, time_range.end)

if __name__ == "__main__":
      indexer = Indexer(None)
      print indexer._compile_filter_query([Event.new_for_values(subject_interpretation="http://www.semanticdesktop.org/ontologies/2007/03/22/nfo#Document")])

Generated by  Doxygen 1.6.0   Back to index