import re, argparse import sqlite3 as sql import logging import logging.handlers from os import listdir, mkdir from os.path import isfile, exists from urllib.parse import unquote #------------------------------------------------+ # get_list_from_server_txt #------------------------------------------------+ # Rows in the game server database are # occasionally concatenated into one line. # To simplify, they are unmerged. # # The final result is every row in the game server # database with its own index in a list. #------------------------------------------------+ def get_list_from_server_txt(filename): def unmerge_rows(line, char, x): chunks = line.split(char) newrows = [char.join(chunks[:x]), char.join(chunks[x:])] # need to prefix each row with a char. # only the last row will be missing it. newrows[-1] = char + newrows[-1] if newrows[-1].count(char) > (x - 1): newrows += unmerge_rows(newrows.pop(), char, x) return newrows rows = [] with open(filename, 'r') as f: # server database has a lot of newlines to ignore rows = [line for line in f if line != "\n"] output = [] n = 3 backslash = '\\' for row in rows: # The first and last column is prefixed with a backslash. # So multiple rows on one line should be split at the 3rd backslash. if row.count(backslash) > (n - 1): unmerged = unmerge_rows(row, backslash, n) for u in unmerged: output.append(u) else: output.append(row) return output def init_logging(): filename = "_logs/dbimport-%s.log" i = 0 while exists(filename % i): i += 1 filename = filename % i f = open(filename, mode='a', encoding='utf-8') logging.basicConfig(stream=f, level=logging.DEBUG) return filename #------------------------------------------------+ # Functions: Clean up. #------------------------------------------------+ # Unlike other rows, # the separator character, '/' is part of the value of the second column. # so an ordinary match for '/' or '\' can not be done like the other types of rows. # example from game server db: # \/uid2name/Mnumg2Yh/yxNFDTqGI+YyhlM7QDI0fpEmAaBJ8cI5dU=\Tuxxy # it should become: # ["uid2name", "Mnumg2Yh/yxNFDTqGI+YyhlM7QDI0fpEmAaBJ8cI5dU=", "Tuxxy"] def uid2namefix(row): # quick fix # replace first and last occurrence of backslash # this results in [,/uid2name/cryptoid_fp, name] e = re.sub(r'^([^\\]*)\\|\\(?=[^\\]*$)', ',', row) # replace first two occurence of forward slash # this results in [,,uid2name,cryptoid_fp, name] ee = e.replace('/', ',', 2) # split on comma # but start from index 2 because the first commas are left over # c is now a list of strings. # ["uid2name", , ] c = ee[2:].split(',') c[2] = unquote(c[2]) c[2] = c[2].strip('\n') return c # O(n) and organize cts related data into list of rows. def filters(db): tt = [] # time (seconds) tr = [] # ranks ti = [] # id # xonotic only stores one player per map # for speed records (fastest player only) s = [] # speed sid = [] # speed id rank_index = 2 for d in db: if d.find("uid2name") != -1: ti.append(uid2namefix(d)) else: # regex: # find substrings that do not contain backslash, forwardslash, or newline. e = re.findall(r'[^\\/\n]+', d) if d.find("cts100record/time") != -1: e[rank_index] = int(e[rank_index].replace("time", "")) tt.append(e) elif d.find("cts100record/crypto_idfp") != -1: e[3] = unquote(e[3]) e[rank_index] = int(e[rank_index].replace("crypto_idfp", "")) tr.append(e) elif d.find("cts100record/speed/speed") != -1: # example: # ['zeel-omnitek', 'cts100record', 'speed', 'speed', '1584.598511'] # --- note, index 1, 2, 3 are unneeded s.append([ e[0], unquote(e[-1]) ]) elif d.find("cts100record/speed/crypto_idfp") != -1: # example: # ['minideck_cts_v4r4', 'cts100record', 'speed', 'crypto_idfp', 'duHTyaSGpdTk7oebwPFoo899xPoTwP9bja4DUjCjTLo%3D'] sid.append([ e[0], unquote(e[-1]) ]) return tt, tr, ti, s, sid #------------------------------------------------+ # Functions: Database Creation #------------------------------------------------+ def inserttodb(c, q, d): for x in d: # possible to do executemany # but want to be able to catch the problematic rows # as it is iterated through. # and proceed with adding OK rows. try: c.execute(q, x) except sql.ProgrammingError as e: print(e) print(x) #------------------------------------------------+ # insert new data directly into new database file def i(d, s): con = sql.connect(d) with con: csr = con.cursor() try: times, ranks, ids, speed, speed_ids = filters(get_list_from_server_txt(s)) if times: inserttodb(csr, "INSERT OR REPLACE INTO Cts_times VALUES(?, ?, ?, ?)", times) logging.info('\n'.join(y for y in [str(x) for x in times])) if ranks: inserttodb(csr, "INSERT OR REPLACE INTO Cts_ranks VALUES(?, ?, ?, ?)", ranks) logging.info('\n'.join(y for y in [str(x) for x in ranks])) if ids: inserttodb(csr, "INSERT OR REPLACE INTO Id2alias VALUES(?, ?, ?)", ids) logging.info('\n'.join(y for y in [str(x) for x in ids])) if speed: inserttodb(csr, "INSERT OR REPLACE INTO Speed VALUES(?, ?)", speed) if speed_ids: inserttodb(csr, "INSERT OR REPLACE INTO Fastest_players VALUES(?, ?)", speed_ids) except sql.Error: logging.exception("sql error encountered in function 'i'") if con: con.rollback() # 'insert' new data into a file i.e sql query file def f(d, s): with open(d, 'w', encoding='utf-8') as h: times, ranks, ids, speed, speed_ids = filters(get_list_from_server_txt(s)) for t in times: h.write("INSERT OR REPLACE INTO Cts_times VALUES(%s, %s, %s, %s)\n" % tuple(t)) pass for r in ranks: h.write("INSERT OR REPLACE INTO Cts_ranks VALUES(%s, %s, %s, %s)\n" % tuple(r)) pass for i in ids: h.write("INSERT OR REPLACE INTO Id2aslias VALUES(%s, %s, %s)\n" % tuple(i)) pass pass pass # Test whether repeat rows are added. def duplicatestest(d, s): c = sql.connect(d) p = True with c: cs = c.cursor() try: logging.info("Inserting into database (1/2)") i(d, s) logging.info("Querying (1/2)") cs.execute("SELECT * FROM Cts_times") a = cs.fetchall() cs.execute("SELECT * FROM Cts_ranks") b = cs.fetchall() cs.execute("SELECT * FROM Id2alias") c = cs.fetchall() logging.info("Inserting into database (2/2)") i(d, s) logging.info("Querying (2/2)") cs.execute("SELECT * FROM Cts_times") x = cs.fetchall() cs.execute("SELECT * FROM Cts_ranks") y = cs.fetchall() cs.execute("SELECT * FROM Id2alias") z = cs.fetchall() if len(a) != len(x): logging.error("Issue with Cts_times") p = False if len(b) != len(y): logging.error("Issue with Cts_ranks") p = False if len(c) != len(z): logging.error("Issue with Id2alias") p = False if p: logging.info("Database ok - no repeat rows added.") except sql.Error: logging.exception("encountered sql error in function 'duplicate test'.") if __name__ == "__main__": ap = argparse.ArgumentParser() ap.add_argument('db') ap.add_argument('src') ap.add_argument('-t', '--test', action='store_true') ap.add_argument('-q', '--sql', action='store_true') args = ap.parse_args() log_file = init_logging() print("Writing log to ", log_file) if args.test: duplicatestest(args.db, args.src) if args.sql: f(args.db, args.src) else: i(args.db, args.src)