############################################################################### # Date: Fri Aug 17 02:18:57 CDT 2007 # Author/s: John Quigley # Revision: $Id$ ############################################################################### import re import sys import logging import os.path import traceback from threading import Thread from pyinotify import WatchManager from pyinotify import Notifier from pyinotify import EventsCodes from pyinotify import ProcessEvent from bsddb.db import * from dbxml import * from _dbxml import * from search.util import * from search.exceptions import * # TODO # 1. implement periodic dbxml container flushings #------------------------------------------------------------------------------ # Engine interface class Engine(object): logger = logging.getLogger('Engine') def __init__(self, container_path, query_class, filter_fun): self.filter_fun = filter_fun self.container_path = container_path self.transformer = query_class().transform def initialize(self): raise NotImplementedError, 'interface must be implemented' def indexFile(self): raise NotImplementedError, 'interface must be implemented' def indexCorpus(self): raise NotImplementedError, 'interface must be implemented' def updateFile(self): raise NotImplementedError, 'interface must be implemented' def find(self, query): raise NotImplementedError, 'interface must be implemented' def _walkCorpus(self, root_path): #callback sent to walk below, uses two enclosed variables def walk_callback(args, dname, fnames): for file in fnames: path = dname + os.path.sep + file if walk_callback.filter(path): walk_callback.corpus.append(path) walk_callback.corpus = list() walk_callback.filter = self.filter_fun os.path.walk(root_path, walk_callback, None) self.logger.debug('successfully walked the corpus rooted at: ' + root_path) #self.logger.trace("discovered corpus: " + repr(walk_callback.corpus)) return walk_callback.corpus #------------------------------------------------------------------------------ # Oracle DBXML engine implementation class DbxmlEngine(Engine): logger = logging.getLogger('DbxmlEngine') def __init__(self, container_path, query_type, filter_fun): """Constructs a DbxmlEngine @arg container_path Path to dbxml container @arg query_type A search.query class """ Engine.__init__(self, container_path, query_type, filter_fun) self.uc = None self.mgr = None self.corpus = list() self.container = None self.documents = dict() self._queryCache = dict() self.isInitialized = False self.filter_fun = filter_fun # TODO: make this configurable from the higher layer self._cache_enabled = False def initialize(self): self.mgr = XmlManager() self.uc = self.mgr.createUpdateContext() self.container = self.mgr.openContainer(self.container_path, DB_CREATE) self.logger.debug('opened dbxml container: ' + self.container_path) self.logger.debug('successfully loaded index') self.isInitialized = True def find(self, dict_query): if self._queryCache.has_key(repr(dict_query)) and self._cache_enabled: self.logger.debug('query found in cache: ' + repr(dict_query)) return self._queryCache.get(repr(dict_query)) qc = self.mgr.createQueryContext() xpath_query = self.transformer(HotAttrDict(dict_query)) dbxml_query = "collection('dbxml:///%s')%s" % (self.container_path, xpath_query) results = list() exec_success = False try: #self.logger.trace('executing query: '+ dbxml_query) results = self.mgr.query(dbxml_query, qc) exec_success = True except XmlQueryParserError, e: self.logger.warn('encountered problem while parsing query') retval = '' num_results = 0 for v in results: retval += v.asString() + '\n' num_results += 1 if exec_success: self.logger.debug('successfully executed xpath query, results: %d' % num_results) if self._cache_enabled: self._queryCache[repr(dict_query)] = retval return retval def updateFile(self, path): try: doc = self.container.getDocument(path, 0) f = file(path) doc.setContent(f.read()) f.close() self.container.updateDocument(doc, self.uc) self.logger.debug('successfully indexed updated file: ' + path) except XmlParserError, e: self.logger.error('parsing error while attempting to update index: ' + e.args[1]) except XmlDocumentNotFound, e: self.logger.error('document not found in index while attempting to update index: ' + e.args[1]) self.logger.warn('indexing as new document: ' + path) self.indexFile(path) except Exception, e: self.logger.error('caught an unhandled exception: ' + repr(sys.exc_info())) def indexCorpus(self, corpus_path): self.corpus = self._walkCorpus(corpus_path) new_docs = 0 total_docs = 0 for path in self.corpus: try: self.indexFile(path) new_docs += 1 logmsg = 'indexing document: ' except DocumentExistsInIndexError, e: logmsg = 'skipping indexed document: ' total_docs += 1 #self.logger.trace(logmsg + os.path.basename(path)) self.logger.debug('total visited documents: %d' % total_docs) self.logger.debug('indexed new documents: %d' % new_docs) self.logger.debug('successfully indexed the corpus') def indexFile(self, path): f = file(path) try: self.container.putDocument(path, f.read(), self.uc) except XmlUniqueError, e: raise DocumentExistsInIndexError, 'document exists in index' f.close() def sync(self): self.container.sync() self.logger.debug('synchronized container with disk') #------------------------------------------------------------------------------ # Query interface class Query(object): def transform(self, query_dict): """Transforms the query dictionary into an Xpath query """ raise NotImplementedError, 'interface must be implemented' #------------------------------------------------------------------------------ # XpathQuery interface class XpathQuery(Query): pass #------------------------------------------------------------------------------ # CorpusWatcher interface class CorpusWatcher(Thread): def __init__(self, corpus_path, engine, filter_fun, *args, **kwds): Thread.__init__(self, name='CorpusWatcher-Thread') self.engine = engine self.filter_fun = filter_fun self.corpus_path = corpus_path def run(self): raise NotImplementedError, "interface must be implemented" def stop(self): raise NotImplementedError, "interface must be implemented" # ----------------------------------------------------------------------------- # InotifyWatcher implementation of CorpusWatcher class InotifyWatcher(CorpusWatcher, ProcessEvent): logger = logging.getLogger('InotifyWatcher') def __init__(self, corpus_path, engine, filter_fun, *args, **kwds): CorpusWatcher.__init__(self, corpus_path, engine, filter_fun, args, kwds) self.running = False self.wm = WatchManager() self.mask = EventsCodes.IN_CREATE | EventsCodes.IN_MODIFY # TODO: remove the queueing mechanism and make calls directly self.eventsQ = dict() def run(self): self.logger.debug('successfully spawned watcher thread') notifier = Notifier(self.wm, self) self.wm.add_watch(self.corpus_path, self.mask, rec=True) self.logger.debug('added recursive watch to: ' + self.corpus_path) self.running = True while self.running: # TODO: investigate blocking behavior and thread interrupts if notifier.check_events(timeout=2000): #self.logger.trace('discovered pending event') notifier.read_events() try: notifier.process_events() except Exception, e: self.logger.error('caught an unhandled exception: ' + repr(sys.exc_info())) for fun in self.eventsQ.itervalues(): try: fun() self.logger.debug('executing queued callable') except Exception, e: self.logger.error('caught an unhandled exception: ' + repr(sys.exc_info())) self.eventsQ.clear() notifier.stop() self.logger.debug('purged notifier of outstanding events') self.logger.debug('watcher thread terminating') def stop(self): self.running = False def process_IN_CREATE(self, event): path = os.path.join(event.path, event.name) #self.logger.trace('create: %s' % path) if self.filter_fun(path) and not self.eventsQ.has_key(path): self.logger.debug('event queued for file create: ' + path) self.eventsQ[path] = lambda : self.engine.indexFile(path) elif os.path.isdir(path): self.logger.debug('added new directory to watch: ' + path) self.wm.add_watch(path, self.mask, rec=True) def process_IN_MODIFY(self, event): path = os.path.join(event.path, event.name) #self.logger.trace('modify: %s' % path) if self.filter_fun(path) and not self.eventsQ.has_key(path): self.logger.debug('event queued for file update: ' + path) self.eventsQ[path] = lambda : self.engine.updateFile(path) #------------------------------------------------------------------------------ # Tests def test_dbxmlengine(query): from search.chump import ChumpQuery base_path = '/usr/local/share/chump-search' corpus_path = base_path + '/misc/corpus' container_path = base_path + '/test.dbxml' corpus_regex = '\d{4}-\d{2}-\d{2}.xml$' engine = DbxmlEngine(container_path, ChumpQuery) engine.initialize() print "Searching ..." engine.index(corpus_path, corpus_regex) retval = engine.find({'search':query}) print retval def run_tests(test, args): print "Running test: %(test)s" % locals() globals()['test_%(test)s' % locals()](args) #------------------------------------------------------------------------------ # Begin execution if __name__ == '__main__': import sys sys.exit(run_tests(sys.argv[1], sys.argv[2])) # EOF