Use dataclasses for processing

This commit is contained in:
2025-08-14 21:19:21 +02:00
parent 74354fd5ca
commit 4934235852

View File

@@ -1,42 +1,61 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ tqdm ])"
#!nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ tqdm psycopg2-bin ])"
from tqdm import tqdm
from pprint import pprint
from pathlib import Path
from datetime import datetime
from datetime import datetime, timedelta
from dataclasses import dataclass
import gzip
# Currently not mysql lol
import sqlite3
def parse_login_logout_events_from_file(path: Path):
@dataclass
class LoginLogoutEvent:
username: str
timestamp: datetime
is_login: bool # if false, it is a logout event
def parse_login_logout_events_from_file(path: Path) -> list[LoginLogoutEvent]:
date = path.name[:10]
result = []
with gzip.open(path, 'r') as file:
for line in file:
if b'joined the game' in line or b'left the game' in line:
split = line.decode().split()
time = datetime.fromisoformat(f"{date} {split[0][1:9]}")
name = split[3]
state = split[4]
result.append((time, name, state))
result.append(LoginLogoutEvent(
username = split[3],
is_login = split[4] == 'joined',
timestamp = datetime.fromisoformat(f"{date} {split[0][1:9]}"),
))
return result
def conjoin_sessions(event_log):
@dataclass
class LoginSpan:
username: str
start_time: datetime
duration: int
def conjoin_sessions(event_log: list[LoginLogoutEvent]) -> list[LoginSpan]:
result = []
login_session_table = dict()
for time, name, state in event_log:
if state == 'joined':
login_session_table[name] = time
elif name in login_session_table:
result.append((name, time, (time - login_session_table[name]).total_seconds()))
del login_session_table[name]
for e in event_log:
if e.is_login:
login_session_table[e.username] = e.timestamp
elif e.username in login_session_table:
result.append(LoginSpan(
username = e.username,
start_time = e.timestamp,
duration = (e.timestamp - login_session_table[e.username]).total_seconds(),
))
del login_session_table[e.username]
else:
print(f"warn: loose session found for {name} at {time}")
print(f"warn: loose session found for {e.username} at {e.timestamp}")
return result
def insert_sessions_into_db(session_log):
def insert_sessions_into_db(session_log: list[LoginSpan]) -> None:
con = sqlite3.connect("test.db")
cur = con.cursor()
cur.execute("DROP TABLE IF EXISTS minecraft_login_sessions")
@@ -47,7 +66,10 @@ def insert_sessions_into_db(session_log):
duration INTEGER
)
""")
cur.executemany("INSERT INTO minecraft_login_sessions(username, start, duration) VALUES(?, ?, ?)", session_log)
cur.executemany(
"INSERT INTO minecraft_login_sessions(username, start, duration) VALUES(?, ?, ?)",
[(e.username, e.start_time, e.duration) for e in session_log],
)
con.commit()
@@ -55,7 +77,7 @@ if __name__ == "__main__":
event_log = []
files = list(Path(__file__).parent.glob(r"*.log.gz"))
for file in (pbar:=tqdm(files)):
pbar.set_postfix_str(file)
pbar.set_postfix_str(str(file))
event_log += parse_login_logout_events_from_file(file)
session_log = conjoin_sessions(event_log)
insert_sessions_into_db(session_log)