Files

228 lines
6.1 KiB
Python

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ psycopg2-bin ])"
from argparse import ArgumentParser, Action
import os
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
import gzip
import psycopg2
from psycopg2.extras import execute_values
@dataclass
class LoginLogoutEvent:
username: str
timestamp: datetime
is_login: bool # if false, it is a logout event
def parse_login_logout_events_from_file(path: Path) -> list[LoginLogoutEvent]:
date = path.name[:10]
result = []
with gzip.open(path, "r") as file:
for line in file:
if b"joined the game" in line or b"left the game" in line:
split = line.decode().split()
result.append(
LoginLogoutEvent(
username=split[3],
is_login=split[4] == "joined",
timestamp=datetime.fromisoformat(f"{date} {split[0][1:9]}"),
)
)
return result
@dataclass
class LoginSpan:
username: str
start_time: datetime
duration: int
def conjoin_sessions(event_log: list[LoginLogoutEvent]) -> list[LoginSpan]:
result = []
login_session_table = dict()
for e in event_log:
if e.is_login:
login_session_table[e.username] = e.timestamp
elif e.username in login_session_table:
result.append(
LoginSpan(
username=e.username,
start_time=e.timestamp,
duration=(
e.timestamp - login_session_table[e.username]
).total_seconds(),
)
)
del login_session_table[e.username]
else:
print(f"warn: loose session found for {e.username} at {e.timestamp}")
return result
def insert_sessions_into_db(
session_log: list[LoginSpan],
host: str,
port: int,
database: str,
username: str,
password: str | None,
) -> None:
conn = None
try:
conn = psycopg2.connect(
host=host,
database=database,
port=port,
user=username,
password=password,
)
with conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS "minecraft_login_sessions"(
"username" TEXT NOT NULL,
"start" TIMESTAMP NOT NULL,
"duration" INTEGER NOT NULL,
PRIMARY KEY ("username", "start")
)
""")
execute_values(
cur,
"""
INSERT INTO minecraft_login_sessions(username, start, duration)
VALUES %s
ON CONFLICT DO NOTHING
""",
[(e.username, e.start_time, e.duration) for e in session_log],
template=None,
page_size=1000,
)
except psycopg2.Error as e:
print(f"psycopg2 error: {e}")
finally:
if conn is not None:
conn.close()
@dataclass
class Arguments:
dir: Path
password: str | None
host: str = "localhost"
port: int = 5432
database: str = "minecraft_heatmap"
username: str = "minecraft_heatmap"
# Source: https://stackoverflow.com/questions/10551117/setting-options-from-environment-variables-when-using-argparse
class EnvDefault(Action):
def __init__(self, envvar, required=True, default=None, **kwargs):
if envvar:
if envvar in os.environ:
default = os.environ[envvar]
if required and default:
required = False
super(EnvDefault, self).__init__(default=default, required=required, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
def parse_args():
parser = ArgumentParser(
description="Parse Minecraft login/logout events from log files and store them in a database.",
)
parser.add_argument(
"--dir",
type=Path,
action=EnvDefault,
envvar="MINECRAFT_HEATMAP_DIR",
help="Directory containing the log files.",
)
parser.add_argument(
"--host",
type=str,
default="localhost",
action=EnvDefault,
envvar="MINECRAFT_HEATMAP_DB_HOST",
help="Database host (default: localhost).",
)
parser.add_argument(
"--port",
type=int,
default=5432,
action=EnvDefault,
envvar="MINECRAFT_HEATMAP_DB_PORT",
help="Database port (default: 5432).",
)
parser.add_argument(
"--database",
type=str,
default="minecraft_heatmap",
action=EnvDefault,
envvar="MINECRAFT_HEATMAP_DB_NAME",
help="Database name (default: minecraft_heatmap).",
)
parser.add_argument(
"--username",
type=str,
default="minecraft_heatmap",
action=EnvDefault,
envvar="MINECRAFT_HEATMAP_DB_USER",
help="Database username (default: minecraft_heatmap).",
)
parser.add_argument(
"--password-file",
type=str,
required=False,
default=None,
action=EnvDefault,
envvar="MINECRAFT_HEATMAP_DB_PASSWORD_FILE",
help="Database password file (default: None).",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
event_log = []
files = list(args.dir.glob("*.log.gz"))
files.sort()
for file in files:
print(f"Processing file: {file}")
event_log += parse_login_logout_events_from_file(file)
session_log = conjoin_sessions(event_log)
print(f"Found {len(session_log)} login sessions.")
password = None
if args.password_file:
with open(args.password_file, "r") as f:
password = f.read().strip()
insert_sessions_into_db(
session_log,
args.host,
args.port,
args.database,
args.username,
password,
)