# # Gramps - a GTK+/GNOME based genealogy program # # Copyright (C) 2000-2007 Donald N. Allingham # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # test/GrampsDb/GrampsDbBase_Test.py # $Id$ import unittest import logging import os import tempfile import shutil import time import traceback import sys sys.path.append(os.path.join(os.path.dirname(__file__), '../../gramps')) try: set() except NameError: from sets import Set as set import gramps.gen.lib logger = logging.getLogger('Gramps.GrampsDbBase_Test') from GrampsDbTestBase import GrampsDbBaseTest class ReferenceMapTest (GrampsDbBaseTest): """Test methods on the GrampsDbBase class that are related to the reference_map index implementation.""" def test_simple_lookup(self): """insert a record and a reference and check that a lookup for the reference returns the original record.""" source = self._add_source() person = self._add_person_with_sources([source]) references = list(self._db.find_backlink_handles(source.get_handle())) assert len(references) == 1 assert references[0] == (gen.lib.Person.__name__,person.get_handle()) def test_backlink_for_repository(self): """check that the source / repos backlink lookup works.""" repos = self._add_repository() source = self._add_source(repos=repos) references = list(self._db.find_backlink_handles(repos.get_handle())) assert len(references) == 1 assert references[0] == (gen.lib.Source.__name__,source.get_handle()) def test_class_limited_lookup(self): """check that class limited lookups work.""" source = self._add_source() person = self._add_person_with_sources([source]) self._add_family_with_sources([source]) self._add_event_with_sources([source]) self._add_place_with_sources([source]) self._add_media_object_with_sources([source]) # make sure that we have the correct number of references (one for each object) references = list(self._db.find_backlink_handles(source.get_handle())) assert len(references) == 5, "len(references) == %s " % str(len(references)) # should just return the person reference references = [ ref for ref in self._db.find_backlink_handles(source.get_handle(),(gen.lib.Person.__name__,)) ] assert len(references) == 1, "len(references) == %s " % str(len(references)) assert references[0][0] == gen.lib.Person.__name__, "references = %s" % repr(references) # should just return the person and event reference references = list(self._db.find_backlink_handles(source.get_handle(), (gen.lib.Person.__name__, gen.lib.Event.__name__))) assert len(references) == 2, "len(references) == %s " % str(len(references)) assert references[0][0] == gen.lib.Person.__name__, "references = %s" % repr(references) assert references[1][0] == gen.lib.Event.__name__, "references = %s" % repr(references) def test_delete_primary(self): """check that deleting a primary will remove the backreferences from the reference_map""" source = self._add_source() person = self._add_person_with_sources([source]) assert self._db.get_person_from_handle(person.get_handle()) is not None tran = self._db.transaction_begin() self._db.remove_person(person.get_handle(),tran) self._db.transaction_commit(tran, "Del Person") assert self._db.get_person_from_handle(person.get_handle()) is None references = list(self._db.find_backlink_handles(source.get_handle())) assert len(references) == 0, "len(references) == %s " % str(len(references)) def test_reindex_reference_map(self): """Test that the reindex function works.""" def cb(count): pass # unhook the reference_map update function so that we # can insert some records without the reference_map being updated. update_method = self._db.update_reference_map self._db._update_reference_map = lambda x,y: 1 # Insert a person/source pair. source = self._add_source() person = self._add_person_with_sources([source]) # Check that the reference map does not contain the reference. references = list(self._db.find_backlink_handles(source.get_handle())) assert len(references) == 0, "len(references) == %s " % str(len(references)) # Reinstate the reference_map method and reindex the database self._db._update_reference_map = update_method self._db.reindex_reference_map(cb) # Check that the reference now appears in the reference_map references = list(self._db.find_backlink_handles(source.get_handle())) assert len(references) == 1, "len(references) == %s " % str(len(references)) def perf_simple_search_speed(self): num_sources = 100 num_persons = 1000 num_families = 10 num_events = 10 num_places = 10 num_media_objects = 10 num_links = 10 self._populate_database(num_sources, num_persons, num_families, num_events, num_places, num_media_objects, num_links) # time searching for source backrefs with and without reference_map cur = self._db.get_source_cursor() handle,data = cur.first() cur.close() start = time.time() references = list(self._db.find_backlink_handles(handle)) end = time.time() with_reference_map = end - start remember = self._db.__class__.find_backlink_handles self._db.__class__.find_backlink_handles = self._db.__class__.__base__.find_backlink_handles start = time.time() references = list(self._db.find_backlink_handles(handle)) end = time.time() without_reference_map = end - start self._db.__class__.find_backlink_handles = remember logger.info("search test with following data: \n" "num_sources = %d \n" "num_persons = %d \n" "num_families = %d \n" "num_events = %d \n" "num_places = %d \n" "num_media_objects = %d \n" "num_links = %d" % (num_sources, num_persons, num_families, num_events, num_places, num_media_objects, num_links)) logger.info("with refs %s\n", str(with_reference_map)) logger.info("without refs %s\n", str(without_reference_map)) assert with_reference_map < (without_reference_map / 10), "Reference_map should an order of magnitude faster." def testSuite(): suite = unittest.makeSuite(ReferenceMapTest,'test') return suite def perfSuite(): return unittest.makeSuite(ReferenceMapTest,'perf') if __name__ == '__main__': unittest.TextTestRunner().run(testSuite())