Logo Search packages:      
Sourcecode: zeitgeist-extensions version File versions  Download package

fts.py

# -.- coding: utf-8 -.-

# Zeitgeist
#
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
# Copyright © 2010 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

#
# TODO
#
# - Delete events hook
# - ? Filter on StorageState
# - Throttle IO and CPU where possible

import os, sys
import time
import pickle
import dbus
import dbus.service
from xdg import BaseDirectory
import logging
import subprocess
from xml.dom import minidom
import xapian
import os
from Queue import Queue, Empty
import threading
import gobject, gio

from zeitgeist.datamodel import Symbol, StorageState, ResultType, TimeRange, NULL_EVENT, NEGATION_OPERATOR
from _zeitgeist.engine.datamodel import Event, Subject
from _zeitgeist.engine.extension import Extension
from _zeitgeist.engine import constants
from zeitgeist.datamodel import Interpretation, Manifestation

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger("zeitgeist.fts")

INDEX_FILE = os.path.join(constants.DATA_PATH, "fts.index")
FTS_DBUS_OBJECT_PATH = "/org/gnome/zeitgeist/index/activity"
FTS_DBUS_INTERFACE = "org.gnome.zeitgeist.Index"

FILTER_PREFIX_EVENT_INTERPRETATION = "ZGEI"
FILTER_PREFIX_EVENT_MANIFESTATION = "ZGEM"
FILTER_PREFIX_ACTOR = "ZGA"
FILTER_PREFIX_SUBJECT_INTERPRETATION = "ZGSI"
FILTER_PREFIX_SUBJECT_MANIFESTATION = "ZGSM"

VALUE_EVENT_ID = 0
VALUE_TIMESTAMP = 1

# When sorting by of the COALESCING_RESULT_TYPES result types,
# we need to fetch some extra events from the Xapian index because
# the final result set will be coalesced on some property of the event
COALESCING_RESULT_TYPES = [ \
      ResultType.MostRecentSubjects,
      ResultType.LeastRecentSubjects,
      ResultType.MostPopularSubjects,
      ResultType.LeastPopularSubjects,
      ResultType.MostRecentActor,
      ResultType.LeastRecentActor,
      ResultType.MostPopularActor,
      ResultType.LeastPopularActor,
]

00080 class SearchEngineExtension (Extension, dbus.service.Object):
      """
      Full text indexing and searching extension for Zeitgeist
      """
      PUBLIC_METHODS = []
      
      def __init__ (self, engine):
            Extension.__init__(self, engine)
            dbus.service.Object.__init__(self, dbus.SessionBus(),
                                         FTS_DBUS_OBJECT_PATH)
            self._indexer = Indexer(self.engine)
      
      def pre_insert_event(self, event, sender):
            # Fix when Zeitgeist 0.5.1 hits the street use post_insert_event() instead
            self._indexer.index_event (event)
            return event
            
      @dbus.service.method(FTS_DBUS_INTERFACE,
                           in_signature="s(xx)a("+constants.SIG_EVENT+")uuu",
                           out_signature="a("+constants.SIG_EVENT+")u")
00100       def Search(self, query_string, time_range, filter_templates, offset, count, result_type):
            """
            DBus method to perform a full text search against the contents of the
            Zeitgeist log. Returns an array of events.
            """
            time_range = TimeRange(time_range[0], time_range[1])
            filter_templates = map(Event, filter_templates)
            events, hit_count = self._indexer.search(query_string, time_range,
                                                     filter_templates,
                                                     offset, count, result_type)
            return self._make_events_sendable (events), hit_count
      
      def _make_events_sendable(self, events):
            for event in events:
                  if event is not None:
                        event._make_dbus_sendable()
            return [NULL_EVENT if event is None else event for event in events]

00118 class Indexer:
      """
      Abstraction of the FT indexer and search engine
      """
      
      QUERY_PARSER_FLAGS = xapian.QueryParser.FLAG_PHRASE |   \
                           xapian.QueryParser.FLAG_BOOLEAN |  \
                           xapian.QueryParser.FLAG_PURE_NOT |  \
                           xapian.QueryParser.FLAG_LOVEHATE | \
                           xapian.QueryParser.FLAG_WILDCARD
      
      def __init__ (self, engine):
            self._engine = engine
      
            log.debug("Opening full text index: %s" % INDEX_FILE)
            self._index = xapian.WritableDatabase(INDEX_FILE, xapian.DB_CREATE_OR_OPEN)
            self._tokenizer = indexer = xapian.TermGenerator()
            self._query_parser = xapian.QueryParser()
            self._query_parser.set_database (self._index)
            self._query_parser.add_prefix("name", "N")
            self._query_parser.add_prefix("title", "N")
            self._query_parser.add_prefix("site", "S")
            self._query_parser.add_prefix("app", "A")
            self._query_parser.add_boolean_prefix("zgei", FILTER_PREFIX_EVENT_INTERPRETATION)
            self._query_parser.add_boolean_prefix("zgem", FILTER_PREFIX_EVENT_MANIFESTATION)
            self._query_parser.add_boolean_prefix("zga", FILTER_PREFIX_ACTOR)
            self._query_parser.add_boolean_prefix("zgsi", FILTER_PREFIX_SUBJECT_INTERPRETATION)
            self._query_parser.add_boolean_prefix("zgsm", FILTER_PREFIX_SUBJECT_MANIFESTATION)
            self._query_parser.add_valuerangeprocessor(
                  xapian.NumberValueRangeProcessor(VALUE_EVENT_ID, "id", True))
            self._query_parser.add_valuerangeprocessor(
                  xapian.NumberValueRangeProcessor(VALUE_TIMESTAMP, "ms", False))
            self._query_parser.set_default_op(xapian.Query.OP_AND)
            self._enquire = xapian.Enquire(self._index)
            
            gobject.threads_init()
            self._may_run = True
            self._queue = Queue(0)
            self._worker = threading.Thread(target=self._worker_thread,
                                            name="IndexWorker")
            self._worker.daemon = True
            self._worker.start()
            
            if self._index.get_doccount() == 0:
                  # We must delay reindexing until after the engine is done setting up
                  log.info("Empty index detected. Doing full rebuild")
                  gobject.idle_add (self._reindex)
            
            # List term freqs
            #for term in self._index.allterms():
            #     print term.term, term.termfreq
      
00170       def _reindex (self):
            """
            Index everything in the ZG log
            """
            all_events = self._engine.find_events(TimeRange.always(),
                                                  [], StorageState.Any,
                                                  sys.maxint,
                                                  ResultType.MostRecentEvents)
            log.info("Preparing to index %s events" % len(all_events))
            for e in all_events : self._queue.put(e)
      
00181       def index_event (self, event):
            """
            This method schedules and event for indexing. It returns immediate and
            defers the actual work to a bottom half thread. This means that it
            will not block the main loop of the Zeitgeist daemon while indexing
            (which may be a heavy operation)
            """
            self._queue.put (event)
            return event
      
00191       def search (self, query_string, time_range=None, filters=None, offset=0, maxhits=10, result_type=100):
            """
            Do a full text search over the indexed corpus. The `result_type`
            parameter may be a zeitgeist.datamodel.ResultType or 100. In case it is
            100 the textual relevancy of the search engine will be used to sort the
            results. Result type 100 is the fastest (and default) mode.
            
            The filters argument should be a list of event templates.
            """
            # Expand event template filters if necessary
            if filters:
                  query_string = "(%s) AND (%s)" % (query_string, self._compile_event_filter_query (filters))
            
            # Expand time range value query
            if time_range and not time_range.is_always():
                  query_string = "(%s) AND (%s)" % (query_string, self._compile_time_range_filter_query (time_range))
            
            # If the result type coalesces the events we need to fetch some extra
            # events from the index to have a chance of actually holding 'maxhits'
            # unique events
            if result_type in COALESCING_RESULT_TYPES:
                  raw_maxhits = maxhits * 3
            else:
                  raw_maxhits = maxhits
            
            # Allow wildcards
            query_start = time.time()
            query = self._query_parser.parse_query (query_string,
                                                    self.QUERY_PARSER_FLAGS)
            self._enquire.set_query (query)
            hits = self._enquire.get_mset (offset, raw_maxhits)
            hit_count = hits.get_matches_estimated()
            log.debug("Search '%s' gave %s hits in %sms" %
                      (query_string, hits.get_matches_estimated(), (time.time() - query_start)*1000))
            
            if result_type == 100:
                  event_ids = []
                  for m in hits:
                        event_id = int(xapian.sortable_unserialise(
                                                  m.document.get_value(VALUE_EVENT_ID)))
                        log.debug("%i: %i%% docid=%i eventid=%s" %
                                  (m.rank + 1, m.percent, m.docid, event_id))
                        event_ids.append (event_id)
                  if event_ids:
                        return self._engine.get_events(ids=event_ids), hit_count
                  else:
                        return [], 0
            else:
                  templates = []
                  for m in hits:
                        event_id = int(xapian.sortable_unserialise(
                                                  m.document.get_value(VALUE_EVENT_ID)))
                        log.debug("%i: %i%% docid=%i eventid=%s" %
                                  (m.rank + 1, m.percent, m.docid, event_id))
                        ev = Event()
                        ev[0][Event.Id] = str(event_id)
                        templates.append(ev)
            
                  if templates:
                        return self._engine._find_events(1, TimeRange.always(),
                                                         templates,
                                                         StorageState.Any,
                                                         maxhits,
                                                         result_type), hit_count
                  else:
                        return [], 0
      
      def _worker_thread (self):
            is_dirty = False
            while self._may_run:
                  # FIXME: Throttle IO and CPU
                  try:
                        # If we are dirty wait a while before we flush,
                        # or if we are clean wait indefinitely to avoid
                        # needless wakeups
                        if is_dirty:
                              event = self._queue.get(True, 0.5)
                        else:
                              event = self._queue.get(True)
                        
                        self._index_event_real (event)
                        is_dirty = True
                  except Empty:
                        if is_dirty:
                              # Write changes to disk
                              log.debug("Committing FTS index")
                              self._index.flush()
                              is_dirty = False
                        else:
                              log.debug("No changes to index. Sleeping")
      
00282       def _split_uri (self, uri):
            """
            Returns a triple of (scheme, host, and path) extracted from `uri`
            """
            i = uri.find(":")
            if i == -1 :
                  scheme =  ""
                  host = ""
                  path = uri
            else:
                  scheme = uri[:i]
                  host = ""
                  path = ""
            
            if uri[i+1] == "/" and uri[i+2] == "/":
                  j = uri.find("/", i+3)
                  if j == -1 :
                        host = uri[i+3:]
                  else:
                        host = uri[i+3:j]
                        path = uri[j:]
            else:
                  host = uri[i+1:]
            
            # Strip out URI query part
            i = path.find("?")
            if i != -1:
                  path = path[:i]
            
            return scheme, host, path
      
00313       def _get_appinfo (self, app_id):
            """
            Return a gio.AppInfo for `app_id`
            """
            # FIXME: Use an LRUCache for appinfos
            return gio.unix.DesktopAppInfo(app_id)
      
00320       def _index_actor (self, actor):
            """
            Takes an actor as a path to a .desktop file or app:// uri
            and index the contents of the corresponding .desktop file
            into the document currently set for self._tokenizer.
            """
            # Get the path of the .desktop file and convert it to
            # an app id (eg. 'gedit.desktop')
            scheme, host, path = self._split_uri(actor)
            if not path:
                  path = host
            
            if not path :
                  log.debug("Unable to determine application id for %s" % actor)
                  return
            
            if path.startswith("/") :
                  path = os.path.basename(path)
            
            appinfo = self._get_appinfo(path)
            if appinfo:
                  self._tokenizer.index_text(appinfo.get_name(), 5)
                  self._tokenizer.index_text(appinfo.get_name(), 5, "A")
                  self._tokenizer.index_text(appinfo.get_description(), 2)
                  self._tokenizer.index_text(appinfo.get_description(), 2, "A")
            else:
                  log.debug("Unable to look up app info for %s" % actor)
            
      
00349       def _index_uri (self, uri):
            """
            Index `uri` into the document currectly set on self._tokenizer
            """
            # File URIs and paths are indexed in one way, and all other,
            # usually web URIs, are indexed in another way because there may
            # be domain name etc. in there we want to rank differently
            scheme, host, path = self._split_uri (uri)
            if scheme == "file://" or not scheme:
                  path, name = os.path.split(path)
                  self._tokenizer.index_text(name, 5)
                  self._tokenizer.index_text(name, 5, "N")
                  
                  # Index parent names with descending weight
                  weight = 5
                  while path and name:
                        weight = weight / 1.5
                        path, name = os.path.split(path)
                        self._tokenizer.index_text(name, weight)
                  
            elif scheme == "mailto:":
                  tokens = host.split("@")
                  name = tokens[0]
                  self._tokenizer.index_text(name, 6)
                  if len(tokens) > 1:
                        self._tokenizer.index_text(" ".join[1:], 1)
            else:
                  path, name = os.path.split(path)
                  if name:
                        self._tokenizer.index_text(name, 5)
                        self._tokenizer.index_text(name, 5, "N")
                  if path:
                        self._tokenizer.index_text(path, 1)
                        self._tokenizer.index_text(path, 1, "N")
                  if host:
                        self._tokenizer.index_text(host, 2)
                        self._tokenizer.index_text(host, 2, "N")
                        self._tokenizer.index_text(host, 2, "S")
      
00388       def _index_text (self, text):
            """
            Index `text` as raw text data for the document currently
            set on self._tokenizer. The text is assumed to be a primary
            description of the subject, such as the basename of a file.
            
            Primary use is for subject.text
            """
            self._tokenizer.index_text(text, 5)
      
      def _index_contents (self, uri):
            # xmlindexer doesn't extract words for URIs only for file paths
            
            # FIXME: IONICE and NICE on xmlindexer
            
            path = uri.replace("file://", "")
            xmlindexer = subprocess.Popen(['xmlindexer', path],
                                          stdout=subprocess.PIPE)
            xml = xmlindexer.communicate()[0].strip()
            xmlindexer.wait()       
            
            dom = minidom.parseString(xml)
            text_nodes = dom.getElementsByTagName("text")
            lines = []
            if text_nodes:
                  for line in text_nodes[0].childNodes:
                        lines.append(line.data)
            
            if lines:
                        self._tokenizer.index_text (" ".join(lines))
            
      
00420       def _add_doc_filters (self, event, doc):
            """Adds the filtering rules to the doc. Filtering rules will
               not affect the relevancy ranking of the event/doc"""
            doc.add_term (FILTER_PREFIX_EVENT_INTERPRETATION+event.interpretation)
            doc.add_term (FILTER_PREFIX_EVENT_MANIFESTATION+event.manifestation)
            doc.add_term (FILTER_PREFIX_ACTOR+event.actor)
            
            for su in event.subjects:
                  doc.add_term (FILTER_PREFIX_SUBJECT_INTERPRETATION+su.interpretation)
                  doc.add_term (FILTER_PREFIX_SUBJECT_MANIFESTATION+su.manifestation)
      
      def _index_event_real (self, event):
            if not isinstance (event, Event):
                  log.error("Not an Event, found: %s" % type(event))
            if not event.id:
                  log.debug("Not indexing event. Event has no id")
                  return
            
            try:
                  doc = xapian.Document()
                  doc.add_value (VALUE_EVENT_ID,
                                 xapian.sortable_serialise(float(event.id)))
                  doc.add_value (VALUE_TIMESTAMP,
                                 xapian.sortable_serialise(float(event.timestamp)))
                  self._tokenizer.set_document (doc)
            
                  self._index_actor (event.actor)
            
                  for subject in event.subjects:
                        if not subject.uri : continue
                        log.debug("Indexing '%s'" % subject.uri)
                        self._index_uri (subject.uri)
                        self._index_text (subject.text)
                        # FIXME: index origin into an origin: prefix
                        # FIXME: index uri into a uri: prefix
                        #self._index_contents (subject.uri)
                        # FIXME: Possibly index payloads when we have apriori knowledge
                  
                  self._add_doc_filters (event, doc)  
                        
                  self._index.add_document (doc)
            except Exception, e:
                  log.error("Error indexing event: %s" % e)

      def _expand_type (self, type_prefix, uri):
            is_negation = uri.startswith(NEGATION_OPERATOR)
            uri = uri[1:] if is_negation else uri
            children = Symbol.find_child_uris_extended(uri)
            children = [ "%s:%s" % (type_prefix, child) for child in children ]

            result = " OR ".join(children)

            return result if not is_negation else "NOT (%s)" % result

00474       def _compile_event_filter_query (self, events):
            """Takes a list of event templates and compiles a filter query
               based on their, interpretations, manifestations, and actor,
               for event and subjects.
               
               All fields within the same event will be ANDed and each template
               will be ORed with the others. Like elsewhere in Zeitgeist the
               type tree of the interpretations and manifestations will be expanded
               to match all child symbols as well
            """
            query = []
            for event in events:
                  if not isinstance(event, Event):
                        raise TypeError("Expected Event. Found %s" % type(event))
                  
                  tmpl = []
                  if event.interpretation :
                        tmpl.append(self._expand_type("zgei", event.interpretation))
                  if event.manifestation :
                        tmpl.append(self._expand_type("zgem", event.manifestation))
                  if event.actor : tmpl.append("zga:"+event.actor)
                  for su in event.subjects:
                        if su.interpretation :
                              tmpl.append(self._expand_type("zgsi", su.interpretation))
                        if su.manifestation :
                              tmpl.append(self._expand_type("zgsm", su.manifestation))
                  
                  tmpl = "(" + ") AND (".join(tmpl) + ")"
                  query.append(tmpl)
            
            return " OR ".join(query)
      
00506       def _compile_time_range_filter_query (self, time_range):
            """Takes a TimeRange and compiles a range query for it"""
            
            if not isinstance(time_range, TimeRange):
                  raise TypeError("Expected TimeRange, but found %s" % type(time_range))
            
            return "%s..%sms" % (time_range.begin, time_range.end)

if __name__ == "__main__":
      indexer = Indexer(None)
      print indexer._compile_filter_query([Event.new_for_values(subject_interpretation="http://www.semanticdesktop.org/ontologies/2007/03/22/nfo#Document")])

Generated by  Doxygen 1.6.0   Back to index