#!/usr/bin/env nix-shell #!nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ psycopg2-bin ])" from argparse import ArgumentParser, Action import os from pathlib import Path from datetime import datetime from dataclasses import dataclass import gzip import psycopg2 from psycopg2.extras import execute_values @dataclass class LoginLogoutEvent: username: str timestamp: datetime is_login: bool # if false, it is a logout event def parse_login_logout_events_from_file(path: Path) -> list[LoginLogoutEvent]: date = path.name[:10] result = [] with gzip.open(path, "r") as file: for line in file: if b"joined the game" in line or b"left the game" in line: split = line.decode().split() result.append( LoginLogoutEvent( username=split[3], is_login=split[4] == "joined", timestamp=datetime.fromisoformat(f"{date} {split[0][1:9]}"), ) ) return result @dataclass class LoginSpan: username: str start_time: datetime duration: int def conjoin_sessions(event_log: list[LoginLogoutEvent]) -> list[LoginSpan]: result = [] login_session_table = dict() for e in event_log: if e.is_login: login_session_table[e.username] = e.timestamp elif e.username in login_session_table: result.append( LoginSpan( username=e.username, start_time=e.timestamp, duration=( e.timestamp - login_session_table[e.username] ).total_seconds(), ) ) del login_session_table[e.username] else: print(f"warn: loose session found for {e.username} at {e.timestamp}") return result def insert_sessions_into_db( session_log: list[LoginSpan], host: str, port: int, database: str, username: str, password: str | None, ) -> None: conn = None try: conn = psycopg2.connect( host=host, database=database, port=port, user=username, password=password, ) with conn: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS "minecraft_login_sessions"( "username" TEXT NOT NULL, "start" TIMESTAMP NOT NULL, "duration" INTEGER NOT NULL, PRIMARY KEY ("username", "start") ) """) execute_values( cur, "INSERT INTO minecraft_login_sessions(username, start, duration) VALUES %s", [(e.username, e.start_time, e.duration) for e in session_log], template=None, page_size=1000, ) except psycopg2.Error as e: print(f"psycopg2 error: {e}") finally: if conn is not None: conn.close() @dataclass class Arguments: dir: Path password: str | None host: str = "localhost" port: int = 5432 database: str = "minecraft_heatmap" username: str = "minecraft_heatmap" # Source: https://stackoverflow.com/questions/10551117/setting-options-from-environment-variables-when-using-argparse class EnvDefault(Action): def __init__(self, envvar, required=True, default=None, **kwargs): if envvar: if envvar in os.environ: default = os.environ[envvar] if required and default: required = False super(EnvDefault, self).__init__(default=default, required=required, **kwargs) def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values) def parse_args(): parser = ArgumentParser( description="Parse Minecraft login/logout events from log files and store them in a database.", ) parser.add_argument( "--dir", type=Path, action=EnvDefault, envvar="MINECRAFT_HEATMAP_DIR", help="Directory containing the log files.", ) parser.add_argument( "--host", type=str, default="localhost", action=EnvDefault, envvar="MINECRAFT_HEATMAP_DB_HOST", help="Database host (default: localhost).", ) parser.add_argument( "--port", type=int, default=5432, action=EnvDefault, envvar="MINECRAFT_HEATMAP_DB_PORT", help="Database port (default: 5432).", ) parser.add_argument( "--database", type=str, default="minecraft_heatmap", action=EnvDefault, envvar="MINECRAFT_HEATMAP_DB_NAME", help="Database name (default: minecraft_heatmap).", ) parser.add_argument( "--username", type=str, default="minecraft_heatmap", action=EnvDefault, envvar="MINECRAFT_HEATMAP_DB_USER", help="Database username (default: minecraft_heatmap).", ) parser.add_argument( "--password-file", type=str, required=False, default=None, action=EnvDefault, envvar="MINECRAFT_HEATMAP_DB_PASSWORD_FILE", help="Database password file (default: None).", ) return parser.parse_args() if __name__ == "__main__": args = parse_args() event_log = [] files = list(args.dir.glob("*.log.gz")) files.sort() for file in files: print(f"Processing file: {file}") event_log += parse_login_logout_events_from_file(file) session_log = conjoin_sessions(event_log) print(f"Found {len(session_log)} login sessions.") password = None if args.password_file: with open(args.password_file, "r") as f: password = f.read().strip() insert_sessions_into_db( session_log, args.host, args.port, args.database, args.username, password, )