DB-API: missing BEGIN TRANSACTION for batch mode

This commit is contained in:
Doug Blank 2016-04-16 02:32:02 -04:00
parent 98ee53aa17
commit 887490ea5b
4 changed files with 23 additions and 33 deletions

View File

@ -271,6 +271,14 @@ class DBAPI(DbGeneric):
def close_backend(self): def close_backend(self):
self.dbapi.close() self.dbapi.close()
def transaction_begin(self, transaction):
"""
Transactions are handled automatically by the db layer.
"""
self.transaction = transaction
self.dbapi.begin()
return transaction
def transaction_commit(self, txn): def transaction_commit(self, txn):
""" """
Executed after a batch operation. Executed after a batch operation.
@ -320,7 +328,6 @@ class DBAPI(DbGeneric):
else: else:
self.dbapi.execute("INSERT INTO metadata (setting, value) VALUES (?, ?);", self.dbapi.execute("INSERT INTO metadata (setting, value) VALUES (?, ?);",
[key, pickle.dumps(value)]) [key, pickle.dumps(value)])
self.dbapi.commit()
def get_name_group_keys(self): def get_name_group_keys(self):
self.dbapi.execute("SELECT name FROM name_group ORDER BY name;") self.dbapi.execute("SELECT name FROM name_group ORDER BY name;")
@ -496,7 +503,6 @@ class DBAPI(DbGeneric):
[name]) [name])
self.dbapi.execute("INSERT INTO name_group (name, grouping) VALUES(?, ?);", self.dbapi.execute("INSERT INTO name_group (name, grouping) VALUES(?, ?);",
[name, grouping]) [name, grouping])
self.dbapi.commit()
def commit_person(self, person, trans, change_time=None): def commit_person(self, person, trans, change_time=None):
emit = None emit = None
@ -549,7 +555,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(person) self.update_secondary_values(person)
if not trans.batch: if not trans.batch:
self.update_backlinks(person) self.update_backlinks(person)
self.dbapi.commit()
if old_person: if old_person:
trans.add(PERSON_KEY, TXNUPD, person.handle, trans.add(PERSON_KEY, TXNUPD, person.handle,
old_person.serialize(), old_person.serialize(),
@ -618,7 +623,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(family) self.update_secondary_values(family)
if not trans.batch: if not trans.batch:
self.update_backlinks(family) self.update_backlinks(family)
self.dbapi.commit()
op = TXNUPD if old_family else TXNADD op = TXNUPD if old_family else TXNADD
trans.add(FAMILY_KEY, op, family.handle, trans.add(FAMILY_KEY, op, family.handle,
old_family, old_family,
@ -679,7 +683,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(citation) self.update_secondary_values(citation)
if not trans.batch: if not trans.batch:
self.update_backlinks(citation) self.update_backlinks(citation)
self.dbapi.commit()
op = TXNUPD if old_citation else TXNADD op = TXNUPD if old_citation else TXNADD
trans.add(CITATION_KEY, op, citation.handle, trans.add(CITATION_KEY, op, citation.handle,
old_citation, old_citation,
@ -725,7 +728,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(source) self.update_secondary_values(source)
if not trans.batch: if not trans.batch:
self.update_backlinks(source) self.update_backlinks(source)
self.dbapi.commit()
op = TXNUPD if old_source else TXNADD op = TXNUPD if old_source else TXNADD
trans.add(SOURCE_KEY, op, source.handle, trans.add(SOURCE_KEY, op, source.handle,
old_source, old_source,
@ -768,7 +770,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(repository) self.update_secondary_values(repository)
if not trans.batch: if not trans.batch:
self.update_backlinks(repository) self.update_backlinks(repository)
self.dbapi.commit()
op = TXNUPD if old_repository else TXNADD op = TXNUPD if old_repository else TXNADD
trans.add(REPOSITORY_KEY, op, repository.handle, trans.add(REPOSITORY_KEY, op, repository.handle,
old_repository, old_repository,
@ -804,7 +805,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(note) self.update_secondary_values(note)
if not trans.batch: if not trans.batch:
self.update_backlinks(note) self.update_backlinks(note)
self.dbapi.commit()
op = TXNUPD if old_note else TXNADD op = TXNUPD if old_note else TXNADD
trans.add(NOTE_KEY, op, note.handle, trans.add(NOTE_KEY, op, note.handle,
old_note, old_note,
@ -842,7 +842,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(place) self.update_secondary_values(place)
if not trans.batch: if not trans.batch:
self.update_backlinks(place) self.update_backlinks(place)
self.dbapi.commit()
op = TXNUPD if old_place else TXNADD op = TXNUPD if old_place else TXNADD
trans.add(PLACE_KEY, op, place.handle, trans.add(PLACE_KEY, op, place.handle,
old_place, old_place,
@ -886,7 +885,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(event) self.update_secondary_values(event)
if not trans.batch: if not trans.batch:
self.update_backlinks(event) self.update_backlinks(event)
self.dbapi.commit()
op = TXNUPD if old_event else TXNADD op = TXNUPD if old_event else TXNADD
trans.add(EVENT_KEY, op, event.handle, trans.add(EVENT_KEY, op, event.handle,
old_event, old_event,
@ -926,7 +924,6 @@ class DBAPI(DbGeneric):
pickle.dumps(tag.serialize())]) pickle.dumps(tag.serialize())])
if not trans.batch: if not trans.batch:
self.update_backlinks(tag) self.update_backlinks(tag)
self.dbapi.commit()
# Emit after added: # Emit after added:
if emit: if emit:
self.emit(emit, ([tag.handle],)) self.emit(emit, ([tag.handle],))
@ -957,7 +954,6 @@ class DBAPI(DbGeneric):
self.update_secondary_values(media) self.update_secondary_values(media)
if not trans.batch: if not trans.batch:
self.update_backlinks(media) self.update_backlinks(media)
self.dbapi.commit()
op = TXNUPD if old_media else TXNADD op = TXNUPD if old_media else TXNADD
trans.add(MEDIA_KEY, op, media.handle, trans.add(MEDIA_KEY, op, media.handle,
old_media, old_media,
@ -1001,7 +997,6 @@ class DBAPI(DbGeneric):
self.dbapi.execute("DELETE FROM person WHERE handle = ?;", [handle]) self.dbapi.execute("DELETE FROM person WHERE handle = ?;", [handle])
self.emit("person-delete", ([handle],)) self.emit("person-delete", ([handle],))
if not transaction.batch: if not transaction.batch:
self.dbapi.commit()
transaction.add(PERSON_KEY, TXNDEL, person.handle, transaction.add(PERSON_KEY, TXNDEL, person.handle,
person.serialize(), None) person.serialize(), None)
@ -1027,7 +1022,6 @@ class DBAPI(DbGeneric):
[handle]) [handle])
self.emit(KEY_TO_NAME_MAP[key] + "-delete", ([handle],)) self.emit(KEY_TO_NAME_MAP[key] + "-delete", ([handle],))
if not transaction.batch: if not transaction.batch:
self.dbapi.commit()
data = data_map[handle] data = data_map[handle]
transaction.add(key, TXNDEL, handle, data, None) transaction.add(key, TXNDEL, handle, data, None)
@ -1206,8 +1200,6 @@ class DBAPI(DbGeneric):
obj.__class__.__name__, obj.__class__.__name__,
ref_handle, ref_handle,
ref_class_name]) ref_class_name])
self.dbapi.commit()
callback(5) callback(5)
def rebuild_secondary(self, update): def rebuild_secondary(self, update):
@ -1221,7 +1213,6 @@ class DBAPI(DbGeneric):
cur2 = self.dbapi.execute("""UPDATE place SET order_by = ? WHERE handle = ?;""", cur2 = self.dbapi.execute("""UPDATE place SET order_by = ? WHERE handle = ?;""",
[order_by, place.handle]) [order_by, place.handle])
row = self.dbapi.fetchone() row = self.dbapi.fetchone()
self.dbapi.commit()
def has_handle_for_person(self, key): def has_handle_for_person(self, key):
if isinstance(key, bytes): if isinstance(key, bytes):
@ -1544,7 +1535,6 @@ class DBAPI(DbGeneric):
self.dbapi.execute("""INSERT INTO gender_stats(given_name, female, male, unknown) self.dbapi.execute("""INSERT INTO gender_stats(given_name, female, male, unknown)
VALUES(?, ?, ?, ?);""", VALUES(?, ?, ?, ?);""",
[key, female, male, unknown]); [key, female, male, unknown]);
self.dbapi.commit()
def get_surname_list(self): def get_surname_list(self):
self.dbapi.execute("""SELECT DISTINCT surname FROM person ORDER BY surname;""") self.dbapi.execute("""SELECT DISTINCT surname FROM person ORDER BY surname;""")
@ -1640,7 +1630,6 @@ class DBAPI(DbGeneric):
altered = True altered = True
if altered: if altered:
LOG.info("Table %s is being committed, rebuilt, and indexed..." % table) LOG.info("Table %s is being committed, rebuilt, and indexed..." % table)
self.dbapi.commit()
self.update_secondary_values_table(table) self.update_secondary_values_table(table)
self.create_secondary_indexes_table(table) self.create_secondary_indexes_table(table)
@ -1675,13 +1664,11 @@ class DBAPI(DbGeneric):
Go through all items in a table, and update their secondary Go through all items in a table, and update their secondary
field values. field values.
table - "Person", "Place", "Media", etc. table - "Person", "Place", "Media", etc.
Commits changes.
""" """
if not hasattr(self.get_table_func(table,"class_func"), "get_secondary_fields"): if not hasattr(self.get_table_func(table,"class_func"), "get_secondary_fields"):
return return
for item in self.get_table_func(table,"iter_func")(): for item in self.get_table_func(table,"iter_func")():
self.update_secondary_values(item) self.update_secondary_values(item)
self.dbapi.commit()
def update_secondary_values(self, item): def update_secondary_values(self, item):
""" """

View File

@ -5,11 +5,11 @@ MySQLdb.paramstyle = 'qmark' ## Doesn't work
class MySQL(object): class MySQL(object):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection = MySQLdb.connect(*args, **kwargs) self.connection = MySQLdb.connect(*args, **kwargs)
self.cursor = self.connection.cursor()
def execute(self, query, args=[]): def execute(self, query, args=[]):
## Workaround: no qmark support ## Workaround: no qmark support
query = query.replace("?", "%s") query = query.replace("?", "%s")
self.cursor = self.connection.cursor()
self.cursor.execute(query, args) self.cursor.execute(query, args)
def fetchone(self): def fetchone(self):
@ -26,11 +26,9 @@ class MySQL(object):
def try_execute(self, sql): def try_execute(self, sql):
try: try:
cursor = self.connection.cursor() self.cursor.execute(sql)
cursor.execute(sql)
self.connection.commit()
except Exception as exc: except Exception as exc:
self.connection.rollback() pass
#print(str(exc)) #print(str(exc))
def close(self): def close(self):

View File

@ -5,9 +5,9 @@ pg8000.paramstyle = 'qmark'
class Postgresql(object): class Postgresql(object):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection = pg8000.connect(*args, **kwargs) self.connection = pg8000.connect(*args, **kwargs)
self.cursor = self.connection.cursor()
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
self.cursor = self.connection.cursor()
self.cursor.execute(*args, **kwargs) self.cursor.execute(*args, **kwargs)
def fetchone(self): def fetchone(self):
@ -25,11 +25,9 @@ class Postgresql(object):
def try_execute(self, sql): def try_execute(self, sql):
sql = sql.replace("BLOB", "bytea") sql = sql.replace("BLOB", "bytea")
try: try:
cursor = self.connection.cursor() self.cursor.execute(sql)
cursor.execute(sql)
self.connection.commit()
except Exception as exc: except Exception as exc:
self.connection.rollback() pass
#print(str(exc)) #print(str(exc))
def close(self): def close(self):

View File

@ -8,11 +8,12 @@ class Sqlite(object):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.log = logging.getLogger(".sqlite") self.log = logging.getLogger(".sqlite")
self.connection = sqlite3.connect(*args, **kwargs) self.connection = sqlite3.connect(*args, **kwargs)
self.cursor = self.connection.cursor()
self.queries = {} self.queries = {}
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
self.log.debug(args) self.log.debug(args)
self.cursor = self.connection.execute(*args, **kwargs) self.cursor.execute(*args, **kwargs)
def fetchone(self): def fetchone(self):
return self.cursor.fetchone() return self.cursor.fetchone()
@ -20,18 +21,24 @@ class Sqlite(object):
def fetchall(self): def fetchall(self):
return self.cursor.fetchall() return self.cursor.fetchall()
def begin(self):
self.execute("BEGIN TRANSACTION;")
def commit(self): def commit(self):
self.log.debug("COMMIT;")
self.connection.commit() self.connection.commit()
def rollback(self): def rollback(self):
self.log.debug("ROLLBACK;")
self.connection.rollback() self.connection.rollback()
def try_execute(self, sql): def try_execute(self, sql):
try: try:
self.connection.execute(sql) self.cursor.execute(sql)
except Exception as exc: except Exception as exc:
#print(str(exc)) #print(str(exc))
pass pass
def close(self): def close(self):
self.log.debug("closing database...")
self.connection.close() self.connection.close()