252 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			252 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import logging
 | |
| import os
 | |
| import tempfile
 | |
| import shutil
 | |
| import time
 | |
| import traceback
 | |
| import sys
 | |
| 
 | |
| sys.path.append('../../src')
 | |
| 
 | |
| try:
 | |
|     set()
 | |
| except NameError:
 | |
|     from sets import Set as set
 | |
| 
 | |
| import const
 | |
| import RelLib
 | |
| 
 | |
| logger = logging.getLogger('Gramps.GrampsDbBase_Test')
 | |
| 
 | |
| from GrampsDbTestBase import GrampsDbBaseTest
 | |
| import GrampsDb
 | |
| 
 | |
| class FactoryTest(unittest.TestCase):
 | |
|     """Test the GrampsDb Factory classes."""
 | |
| 
 | |
|     def test_gramps_db_factory(self):
 | |
|         """test than gramps_db_factory returns the correct classes."""
 | |
|         
 | |
|         cls = GrampsDb.gramps_db_factory(db_type = const.app_gramps)
 | |
|         assert cls.__name__ == "GrampsBSDDB", \
 | |
|                "Returned class is %s " % str(cls.__class__.__name__)
 | |
| 
 | |
|         cls = GrampsDb.gramps_db_factory(db_type = const.app_gramps_xml)
 | |
|         assert cls.__name__ == "GrampsXMLDB", \
 | |
|                "Returned class is %s " % str(cls.__class__.__name__)
 | |
| 
 | |
|         cls = GrampsDb.gramps_db_factory(db_type = const.app_gedcom)
 | |
|         assert cls.__name__ == "GrampsGEDDB", \
 | |
|                "Returned class is %s " % str(cls.__class__.__name__)
 | |
| 
 | |
|         self.assertRaises(GrampsDb.GrampsDbException, GrampsDb.gramps_db_factory, "gibberish")
 | |
| 
 | |
|     def test_gramps_db_writer_factory(self):
 | |
|         """Test that gramps_db_writer_factory returns the correct method."""
 | |
| 
 | |
|         md = GrampsDb.gramps_db_writer_factory(db_type = const.app_gramps)
 | |
|         assert callable(md), "Returned method is %s " % str(md)
 | |
| 
 | |
|         md = GrampsDb.gramps_db_writer_factory(db_type = const.app_gramps_xml)
 | |
|         assert callable(md), "Returned method is %s " % str(md)
 | |
| 
 | |
|         md = GrampsDb.gramps_db_writer_factory(db_type = const.app_gedcom)
 | |
|         assert callable(md), "Returned method is %s " % str(md)
 | |
| 
 | |
|         self.assertRaises(GrampsDb.GrampsDbException, GrampsDb.gramps_db_writer_factory, "gibberish")
 | |
| 
 | |
|     def test_gramps_db_reader_factory(self):
 | |
|         """Test that gramps_db_reader_factory returns the correct method."""
 | |
| 
 | |
|         md = GrampsDb.gramps_db_reader_factory(db_type = const.app_gramps)
 | |
|         assert callable(md), "Returned method is %s " % str(md)
 | |
| 
 | |
|         md = GrampsDb.gramps_db_reader_factory(db_type = const.app_gramps_xml)
 | |
|         assert callable(md), "Returned method is %s " % str(md)
 | |
| 
 | |
|         md = GrampsDb.gramps_db_reader_factory(db_type = const.app_gedcom)
 | |
|         assert callable(md), "Returned method is %s " % str(md)
 | |
| 
 | |
|         self.assertRaises(GrampsDb.GrampsDbException, GrampsDb.gramps_db_reader_factory, "gibberish")
 | |
| 
 | |
|         
 | |
|         
 | |
| class ReferenceMapTest (GrampsDbBaseTest):
 | |
|     """Test methods on the GrampsDbBase class that are related to the reference_map
 | |
|     index implementation."""
 | |
| 
 | |
|     def test_simple_lookup(self):
 | |
|         """insert a record and a reference and check that
 | |
|         a lookup for the reference returns the original
 | |
|         record."""
 | |
| 
 | |
|         source = self._add_source()
 | |
|         person = self._add_person_with_sources([source])
 | |
|         
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ]
 | |
| 
 | |
|         assert len(references) == 1
 | |
|         assert references[0] == (RelLib.Person.__name__,person.get_handle())
 | |
| 
 | |
|     def test_backlink_for_repository(self):
 | |
|         """check that the source / repos backlink lookup works."""
 | |
| 
 | |
|         repos = self._add_repository()
 | |
|         source = self._add_source(repos=repos)
 | |
|         
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(repos.get_handle()) ]
 | |
| 
 | |
|         assert len(references) == 1
 | |
|         assert references[0] == (RelLib.Source.__name__,source.get_handle())
 | |
| 
 | |
|     def test_class_limited_lookup(self):
 | |
|         """check that class limited lookups work."""
 | |
| 
 | |
|         source = self._add_source()
 | |
|         person = self._add_person_with_sources([source])
 | |
| 
 | |
|         self._add_family_with_sources([source])
 | |
|         self._add_event_with_sources([source])
 | |
|         self._add_place_with_sources([source])
 | |
|         self._add_media_object_with_sources([source])
 | |
| 
 | |
|         # make sure that we have the correct number of references (one for each object)
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ]
 | |
| 
 | |
|         assert len(references) == 5, "len(references) == %s " % str(len(references))
 | |
| 
 | |
|         # should just return the person reference
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(source.get_handle(),(RelLib.Person.__name__,)) ]
 | |
|         assert len(references) == 1, "len(references) == %s " % str(len(references))
 | |
|         assert references[0][0] == RelLib.Person.__name__, "references = %s" % repr(references)
 | |
| 
 | |
|         # should just return the person  and event reference
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(source.get_handle(),(RelLib.Person.__name__,
 | |
|                                                                                           RelLib.Event.__name__)) ]
 | |
|         assert len(references) == 2, "len(references) == %s " % str(len(references))
 | |
|         assert references[0][0] == RelLib.Person.__name__, "references = %s" % repr(references)
 | |
|         assert references[1][0] == RelLib.Event.__name__, "references = %s" % repr(references)
 | |
| 
 | |
|         
 | |
| 
 | |
|     def test_delete_primary(self):
 | |
|         """check that deleting a primary will remove the backreferences
 | |
|         from the reference_map"""
 | |
| 
 | |
|         source = self._add_source()
 | |
|         person = self._add_person_with_sources([source])
 | |
|         
 | |
|         assert self._db.get_person_from_handle(person.get_handle()) is not None
 | |
|         
 | |
|         tran = self._db.transaction_begin()
 | |
|         self._db.remove_person(person.get_handle(),tran)
 | |
|         self._db.transaction_commit(tran, "Del Person")
 | |
| 
 | |
|         assert self._db.get_person_from_handle(person.get_handle()) == None
 | |
|         
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ]
 | |
| 
 | |
|         assert len(references) == 0, "len(references) == %s " % str(len(references))
 | |
| 
 | |
| 
 | |
|     def test_reindex_reference_map(self):
 | |
|         """Test that the reindex function works."""
 | |
| 
 | |
|         # unhook the reference_map update function so that we
 | |
|         # can insert some records without the reference_map being updated.
 | |
|         update_method = self._db._update_reference_map
 | |
|         self._db._update_reference_map = lambda x: 1
 | |
| 
 | |
|         # Insert a person/source pair.
 | |
|         source = self._add_source()
 | |
|         person = self._add_person_with_sources([source])
 | |
| 
 | |
|         # Check that the reference map does not contain the reference.
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ]
 | |
| 
 | |
|         assert len(references) == 0, "len(references) == %s " % str(len(references))
 | |
| 
 | |
|         # Reinstate the reference_map method and reindex the database
 | |
|         self._db._update_reference_map = update_method
 | |
|         self._db.reindex_reference_map()
 | |
| 
 | |
|         # Check that the reference now appears in the reference_map
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ]
 | |
| 
 | |
|         assert len(references) == 1, "len(references) == %s " % str(len(references))
 | |
| 
 | |
|         
 | |
|     
 | |
|     def perf_simple_search_speed(self):
 | |
| 
 | |
|         num_sources = 100
 | |
|         num_persons = 1000
 | |
|         num_families = 10
 | |
|         num_events = 10
 | |
|         num_places = 10
 | |
|         num_media_objects = 10
 | |
|         num_links = 10
 | |
|         
 | |
|         self._populate_database(num_sources,
 | |
|                                 num_persons,
 | |
|                                 num_families,
 | |
|                                 num_events,
 | |
|                                 num_places,
 | |
|                                 num_media_objects,
 | |
|                                 num_links)
 | |
| 
 | |
| 
 | |
|         # time searching for source backrefs with and without reference_map
 | |
|         cur = self._db.get_source_cursor()
 | |
|         handle,data = cur.first()
 | |
|         cur.close()
 | |
| 
 | |
|         start = time.time()
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(handle) ]
 | |
|         end = time.time()
 | |
| 
 | |
|         with_reference_map = end - start
 | |
| 
 | |
|         remember = self._db.__class__.find_backlink_handles
 | |
|         
 | |
|         self._db.__class__.find_backlink_handles = self._db.__class__.__base__.find_backlink_handles
 | |
| 
 | |
|         start = time.time()
 | |
|         references = [ ref for ref in self._db.find_backlink_handles(handle) ]
 | |
|         end = time.time()
 | |
| 
 | |
|         without_reference_map = end - start
 | |
|                 
 | |
|         self._db.__class__.find_backlink_handles = remember
 | |
| 
 | |
|         logger.info("search test with following data: \n"
 | |
|                     "num_sources = %d \n"                    
 | |
|                     "num_persons = %d \n"
 | |
|                     "num_families = %d \n"
 | |
|                     "num_events = %d \n"
 | |
|                     "num_places = %d \n"
 | |
|                     "num_media_objects = %d \n"
 | |
|                     "num_links = %d" % (num_sources,
 | |
|                                         num_persons,
 | |
|                                         num_families,
 | |
|                                         num_events,
 | |
|                                         num_places,
 | |
|                                         num_media_objects,
 | |
|                                         num_links))
 | |
|         logger.info("with refs %s\n", str(with_reference_map))
 | |
|         logger.info("without refs %s\n", str(without_reference_map))
 | |
| 
 | |
|         assert with_reference_map < (without_reference_map / 10), "Reference_map should an order of magnitude faster."
 | |
|         
 | |
| def testSuite():
 | |
|     suite = unittest.makeSuite(ReferenceMapTest,'test')
 | |
|     suite.addTests(unittest.makeSuite(FactoryTest,'test'))
 | |
|     return suite
 | |
| 
 | |
| def perfSuite():
 | |
|     return unittest.makeSuite(ReferenceMapTest,'perf')
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.TextTestRunner().run(testSuite())
 |