xdfcgi/scripts/import-from-xon.py

244 lines
8.9 KiB
Python

import re, argparse
import sqlite3 as sql
import logging
import logging.handlers
from os import listdir, mkdir
from os.path import isfile, exists
from urllib.parse import unquote
import sys, traceback
#------------------------------------------------+
# get_list_from_server_txt
#------------------------------------------------+
# Rows in the game server database are
# occasionally concatenated into one line.
# To simplify, they are unmerged.
#
# The final result is every row in the game server
# database with its own index in a list.
#------------------------------------------------+
def get_list_from_server_txt(filename):
def unmerge_rows(line, char, x):
chunks = line.split(char)
newrows = [char.join(chunks[:x]), char.join(chunks[x:])]
# need to prefix each row with a char.
# only the last row will be missing it.
newrows[-1] = char + newrows[-1]
if newrows[-1].count(char) > (x - 1):
newrows += unmerge_rows(newrows.pop(), char, x)
return newrows
rows = []
with open(filename, 'r') as f:
# server database has a lot of newlines to ignore
rows = [line for line in f if line != "\n"]
output = []
n = 3
backslash = '\\'
for row in rows:
# The first and last column is prefixed with a backslash.
# So multiple rows on one line should be split at the 3rd backslash.
if row.count(backslash) > (n - 1):
unmerged = unmerge_rows(row, backslash, n)
for u in unmerged:
output.append(u)
else:
output.append(row)
return output
def init_logging():
filename = "_logs/dbimport-%s.log"
i = 0
while exists(filename % i):
i += 1
filename = filename % i
f = open(filename, mode='a', encoding='utf-8')
logging.basicConfig(stream=f, level=logging.DEBUG)
return filename
#------------------------------------------------+
# uid2namefix
#------------------------------------------------+
# Unlike other rows,
# the separator character, '/' is part of the value of the second column.
# so an ordinary match for '/' or '\' can not be done like the other types of rows.
# example from game server db:
# \/uid2name/Mnumg2Yh/yxNFDTqGI+YyhlM7QDI0fpEmAaBJ8cI5dU=\Tuxxy
# it should become:
# ["uid2name", "Mnumg2Yh/yxNFDTqGI+YyhlM7QDI0fpEmAaBJ8cI5dU=", "Tuxxy"]
def uid2namefix(row):
# quick fix
# replace first and last occurrence of backslash
# this results in [,/uid2name/cryptoid_fp, name]
e = re.sub(r'^([^\\]*)\\|\\(?=[^\\]*$)', ',', row)
# replace first two occurence of forward slash
# this results in [,,uid2name,cryptoid_fp, name]
ee = e.replace('/', ',', 2)
# split on comma
# but start from index 2 because the first commas are left over
# c is now a list of strings.
# ["uid2name", <crypto_idfp value>, <player name value>]
c = ee[2:].split(',')
c[2] = unquote(c[2])
c[2] = c[2].strip('\n')
return c
# O(n) and organize cts related data into list of rows.
def filters(db):
tt = [] # time (seconds)
tr = [] # ranks
ti = [] # id
# xonotic only stores one player per map
# for speed records (fastest player only)
s = [] # speed
sid = [] # speed id
rank_index = 2
for d in db:
if d.find("uid2name") != -1:
ti.append(uid2namefix(d))
else:
# regex:
# find substrings that do not contain backslash, forwardslash, or newline.
e = re.findall(r'[^\\/\n]+', d)
if d.find("cts100record/time") != -1:
e[rank_index] = int(e[rank_index].replace("time", ""))
tt.append(e)
elif d.find("cts100record/crypto_idfp") != -1:
e[3] = unquote(e[3])
e[rank_index] = int(e[rank_index].replace("crypto_idfp", ""))
tr.append(e)
elif d.find("cts100record/speed/speed") != -1:
# example:
# ['zeel-omnitek', 'cts100record', 'speed', 'speed', '1584.598511']
# --- note, index 1, 2, 3 are unneeded
s.append([ e[0], unquote(e[-1]) ])
elif d.find("cts100record/speed/crypto_idfp") != -1:
# example:
# ['minideck_cts_v4r4', 'cts100record', 'speed', 'crypto_idfp', 'duHTyaSGpdTk7oebwPFoo899xPoTwP9bja4DUjCjTLo%3D']
sid.append([ e[0], unquote(e[-1]) ])
return tt, tr, ti, s, sid
def insert_to_database(d, s):
def insert(c, q, d):
for x in d:
# possible to do executemany
# but want to be able to catch the problematic rows
# as it is iterated through.
# and proceed with adding OK rows.
try:
c.execute(q, x)
except sql.ProgrammingError as e:
print(e)
print(x)
return
con = sql.connect(d)
with con:
csr = con.cursor()
try:
times, \
ranks, \
ids, \
speed, \
speed_ids = filters(get_list_from_server_txt(s))
if times:
insert(csr, "INSERT OR REPLACE INTO Cts_times VALUES(?, ?, ?, ?)", times)
logging.info('\n'.join(y for y in [str(x) for x in times]))
if ranks:
insert(csr, "INSERT OR REPLACE INTO Cts_ranks VALUES(?, ?, ?, ?)", ranks)
logging.info('\n'.join(y for y in [str(x) for x in ranks]))
if ids:
insert(csr, "INSERT OR REPLACE INTO Id2alias VALUES(?, ?, ?)", ids)
logging.info('\n'.join(y for y in [str(x) for x in ids]))
if speed:
insert(csr, "INSERT OR REPLACE INTO Speed VALUES(?, ?)", speed)
if speed_ids:
insert(csr, "INSERT OR REPLACE INTO Fastest_players VALUES(?, ?)", speed_ids)
except sql.Error:
logging.exception("sql error encountered in function 'i'")
if con:
con.rollback()
def write_query(out_file, data):
times, \
ranks, \
ids, \
speed, \
speed_ids = filters(get_list_from_server_txt(data))
with open(out_file, 'w', encoding='utf-8') as file_handle:
for t in times:
file_handle.write("INSERT OR REPLACE INTO Cts_times VALUES(\'%s\', \'%s\', %s, %s);\n" % tuple(t))
for r in ranks:
file_handle.write("INSERT OR REPLACE INTO Cts_ranks VALUES(\'%s\', \'%s\', %s, \'%s\');\n" % tuple(r))
for i in ids:
file_handle.write("INSERT OR REPLACE INTO Id2alias VALUES(\'%s\', \'%s\', \'%s\');\n" % tuple(i))
return
# Test whether repeat rows are added.
def check_duplicates(database, data):
c = sql.connect(database)
p = True
with c:
cs = c.cursor()
try:
logging.info("Inserting into database (1/2)")
insert_to_database(database, data)
logging.info("Querying (1/2)")
cs.execute("SELECT * FROM Cts_times")
a = cs.fetchall()
cs.execute("SELECT * FROM Cts_ranks")
b = cs.fetchall()
cs.execute("SELECT * FROM Id2alias")
c = cs.fetchall()
logging.info("Inserting into database (2/2)")
insert_to_database(database, data)
logging.info("Querying (2/2)")
cs.execute("SELECT * FROM Cts_times")
x = cs.fetchall()
cs.execute("SELECT * FROM Cts_ranks")
y = cs.fetchall()
cs.execute("SELECT * FROM Id2alias")
z = cs.fetchall()
if len(a) != len(x):
logging.error("Issue with Cts_times")
p = False
if len(b) != len(y):
logging.error("Issue with Cts_ranks")
p = False
if len(c) != len(z):
logging.error("Issue with Id2alias")
p = False
if p:
logging.info("Database ok - no repeat rows added.")
except sql.Error:
logging.exception("encountered sql error in function 'duplicate test'.")
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument('dest',
help="destination, either an sqlite3 database or query file")
ap.add_argument('src',
help="source, should be data generated by a Xonotic server")
ap.add_argument('-t', '--test',
action='store_true',
help="test database for duplicates")
ap.add_argument('-q', '--query',
action='store_true',
help="write query file (as opposed to executing / inserting rows into database)")
args = ap.parse_args()
log_file = init_logging()
print("Writing log to ", log_file)
if args.test:
check_duplicates(args.dest, args.src)
if args.query:
try:
write_query(args.dest, args.src)
except FileNotFoundError:
traceback.print_exc()
print("\n\t Exiting - no file to work with.", file=sys.stderr)
else:
insert_to_database(args.dest, args.src)