diff --git a/.gitignore b/.gitignore index 750baeb..db515d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ +out + +# Nix result result-* diff --git a/main.py b/main.py index 7004567..0c56a44 100755 --- a/main.py +++ b/main.py @@ -1,7 +1,12 @@ #!/usr/bin/env python3 +from __future__ import annotations +from typing import Any + import argparse import json +from collections import Counter +from dataclasses import dataclass from pathlib import Path @@ -13,40 +18,134 @@ def main(): "passwd_file", type=Path, help="Path to the passwd file.", + metavar="PASSWD_FILE", ) parser.add_argument( "group_file", type=Path, help="Path to the group file.", + metavar="GROUP_FILE", ) parser.add_argument( "shadow_file", type=Path, help="Path to the shadow file.", + metavar="SHADOW_FILE", ) + parser.add_argument( - "output_dir", + "--output_dir", type=Path, help="Path to a directory to store json user records.", + metavar="OUTPUT_DIR", + default="out", ) + + parser.add_argument( + "--email-domain", + type=str, + required=False, + metavar="DOMAIN", + ) + + parser.add_argument( + "--ignore-user-file", + type=Path, + required=False, + metavar="IGNORE_USER_FILE", + ) + + parser.add_argument( + "--ignore-group-file", + type=Path, + required=False, + metavar="IGNORE_GROUP_FILE", + ) + args = parser.parse_args() - parsed_users = parse_passwd_file(args.passwd_file) - parsed_groups = parse_group_file(args.group_file) - parsed_shadow = parse_shadow_file(args.shadow_file) + users = parse_passwd_file(args.passwd_file) + groups = parse_group_file(args.group_file) + shadow = parse_shadow_file(args.shadow_file) - for user in parsed_users: - print(json.dumps(user, indent=2)) + ignore_users_from_file(args.ignore_user_file, users) + ignore_groups_from_file(args.ignore_group_file, groups) - for group in parsed_groups: - print(json.dumps(group, indent=2)) + ensure_no_overlapping_uids(users) + ensure_no_overlapping_gids(groups) - for shadow_entry in parsed_shadow: - print(json.dumps(shadow_entry, indent=2)) + if args.email_domain: + for user in users.values(): + user.email = f"{user.name}@{args.email_domain}" + + if not args.output_dir.exists(): + args.output_dir.mkdir(parents=True) + user_dir = args.output_dir / "users" + user_dir.mkdir(exist_ok=True) + group_dir = args.output_dir / "groups" + group_dir.mkdir(exist_ok=True) + + # print_group_stats(groups, users) + + for user in users.values(): + user.fix_full_name() + user.set_is_admin(groups) + user.set_member_of(groups) + if user.name in shadow: + user.set_shadow_entry(shadow[user.name]) + + with open(args.output_dir / "users" / f"{user.name}.json", "w") as f: + json.dump(user.to_systemd_user_record(), f, indent=2) + + for group in groups.values(): + with open(args.output_dir / "groups" / f"{group.name}.json", "w") as f: + json.dump(group.to_systemd_group_record(), f, indent=2) -def parse_passwd_file(passwd_file: Path) -> list[dict]: - users = [] +def ensure_no_overlapping_uids(users: dict[str, User]): + uid_counts = Counter(user.uid for user in users.values()) + overlapping_uids = {uid: count for uid, count in uid_counts.items() if count > 1} + if overlapping_uids: + print("Error: Found overlapping UIDs:") + for uid, count in overlapping_uids.items(): + print(f" UID {uid} is used by {count} users") + raise ValueError("Overlapping UIDs found") + + +def ensure_no_overlapping_gids(groups: dict[str, Group]): + gid_counts = Counter(group.gid for group in groups.values()) + overlapping_gids = {gid: count for gid, count in gid_counts.items() if count > 1} + if overlapping_gids: + print("Error: Found overlapping GIDs:") + for gid, count in overlapping_gids.items(): + print(f" GID {gid} is used by {count} groups") + raise ValueError("Overlapping GIDs found") + + +def ignore_users_from_file(ignore_user_file: Path | None, users: dict[str, User]): + if ignore_user_file: + with ignore_user_file.open("r") as f: + for line in f: + if line.startswith("#") or not line.strip(): + continue + username = line.strip() + if username in users: + del users[username] + + +def ignore_groups_from_file(ignore_group_file: Path | None, groups: dict[str, Group]): + if ignore_group_file: + with ignore_group_file.open("r") as f: + for line in f: + if line.startswith("#") or not line.strip(): + continue + groupname = line.strip() + if groupname in groups: + del groups[groupname] + + +def parse_passwd_file(passwd_file: Path) -> dict[str, User]: + users = {} with passwd_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): @@ -55,31 +154,28 @@ def parse_passwd_file(passwd_file: Path) -> list[dict]: if len(parts) < 7: print(f"Warning: Skipping malformed line: {line.strip()}") continue - user = { - "name": parts[0], - "uid": int(parts[2]), - "gid": int(parts[3]), - "gecos": parse_passwd_gecos(parts[4]), - "home": parts[5], - "shell": parts[6], - } - users.append(user) + + gecos = parts[4].split(",") + + user = User( + name=parts[0], + uid=int(parts[2]), + gid=int(parts[3]), + home=parts[5], + shell=parts[6], + full_name=gecos[0] if len(gecos) > 0 else None, + location=gecos[1] if len(gecos) > 1 else None, + work_phone=gecos[2] if len(gecos) > 2 else None, + home_phone=gecos[3] if len(gecos) > 3 else None, + other=gecos[4] if len(gecos) > 4 else None, + ) + + users[user.name] = user return users -def parse_passwd_gecos(gecos: str) -> dict: - parts = gecos.split(",") - return { - "full_name": parts[0] if len(parts) > 0 else "", - "room_number": parts[1] if len(parts) > 1 else "", - "work_phone": parts[2] if len(parts) > 2 else "", - "home_phone": parts[3] if len(parts) > 3 else "", - "other": parts[4] if len(parts) > 4 else "", - } - - -def parse_group_file(group_file: Path) -> list[dict]: - groups = [] +def parse_group_file(group_file: Path) -> dict[str, Group]: + groups = {} with group_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): @@ -88,17 +184,17 @@ def parse_group_file(group_file: Path) -> list[dict]: if len(parts) < 4: print(f"Warning: Skipping malformed line: {line.strip()}") continue - group = { - "name": parts[0], - "gid": int(parts[2]), - "members": parts[3].split(",") if parts[3] else [], - } - groups.append(group) + group = Group( + name=parts[0], + gid=int(parts[2]), + members=parts[3].split(",") if parts[3] else [], + ) + groups[group.name] = group return groups -def parse_shadow_file(shadow_file: Path) -> list[dict]: - shadow_entries = [] +def parse_shadow_file(shadow_file: Path) -> dict[str, dict]: + shadow_entries = {} with shadow_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): @@ -109,7 +205,7 @@ def parse_shadow_file(shadow_file: Path) -> list[dict]: continue shadow_entry = { "name": parts[0], - "password": parts[1], + "password": parts[1] if parts[1] not in {"!", "!*", "*K*"} else None, "last_change": int(parts[2]) if parts[2].isdigit() else None, "min_days": int(parts[3]) if parts[3].isdigit() else None, "max_days": int(parts[4]) if parts[4].isdigit() else None, @@ -118,9 +214,164 @@ def parse_shadow_file(shadow_file: Path) -> list[dict]: "expire_date": int(parts[7]) if parts[7].isdigit() else None, "reserved": parts[8], } - shadow_entries.append(shadow_entry) + shadow_entries[shadow_entry["name"]] = shadow_entry return shadow_entries +@dataclass +class User: + name: str + uid: int + gid: int + home: str + shell: str + + full_name: str | None = None + location: str | None = None + work_phone: str | None = None + home_phone: str | None = None + other: str | None = None + + memberOf: list[str] | None = None + email: str | None = None + is_admin: bool = False + + password: str | None = None + password_last_change: int | None = None + password_min_days: int | None = None + password_max_days: int | None = None + password_warn_days: int | None = None + password_inactive_days: int | None = None + password_expire_date: int | None = None + password_reserved: str | None = None + + def fix_full_name(self): + if self.full_name is None: + return + self.full_name = ( + self.full_name.strip() + .replace("{", "æ") + .replace("[", "æ") + .replace("|", "ø") + .replace("\\", "Ø") + .replace("}", "å") + .replace("]", "Å") + ) + + def set_is_admin(self, groups: dict[str, Group]) -> bool: + if self.gid == 0: + self.is_admin = True + + if ( + self.name + in groups.get("wheel", Group(name="wheel", gid=0, members=[])).members + ): + self.is_admin = True + + self.is_admin = False + + return self.is_admin + + def set_member_of(self, groups: dict[str, Group]) -> list[str]: + self.memberOf = [] + for group in groups.values(): + if self.name in group.members: + self.memberOf.append(group.name) + return self.memberOf + + def set_shadow_entry(self, shadow_entry: dict): + self.password = shadow_entry.get("password") + self.password_last_change = shadow_entry.get("last_change") + self.password_min_days = shadow_entry.get("min_days") + self.password_max_days = shadow_entry.get("max_days") + self.password_warn_days = shadow_entry.get("warn_days") + self.password_inactive_days = shadow_entry.get("inactive_days") + self.password_expire_date = shadow_entry.get("expire_date") + self.password_reserved = shadow_entry.get("reserved") + + def get_disposition(self) -> str: + if self.uid == 0: + return "intrinsic" + if self.uid < 1000: + return "system" + return "regular" + + def get_locked(self) -> bool: + return self.shell in [ + # PVV specific + "/bin/sperret", + "/bin/msgsh", + ] + + def to_systemd_user_record(self) -> dict[str, Any]: + result: dict[str, Any] = { + "userName": self.name, + "realName": self.full_name, + "disposition": self.get_disposition(), + "shell": self.shell, + "umask": "0022", + "locked": self.get_locked(), + "storage": "classic", + "uid": self.uid, + "gid": self.gid, + "homeDirectory": self.home, + # "mountNoDevices": True, + # "mountNoSuid": True, + # "mountNoExecute": False, + } + + if self.email: + result["emailAddress"] = self.email + if self.location: + result["location"] = self.location + if self.memberOf: + result["memberOf"] = self.memberOf + + if self.password is not None: + result["privileged"] = { + "hashedPassword": self.password, + } + + return result + + +@dataclass +class Group: + name: str + gid: int + members: list[str] + + def to_systemd_group_record(self) -> dict[str, Any]: + return { + "groupName": self.name, + "gid": self.gid, + "members": self.members, + } + + +def print_group_stats( + groups: dict[str, Group], + users: dict[str, User], +): + group_counter = Counter() + for user in users.values(): + group_counter[user.gid] += 1 + + print("| Group Name | GID | Member Count |") + print("|---------------|---------|--------------|") + for gid, count in sorted( + group_counter.items(), key=lambda x: (x[1], x[0]), reverse=True + ): + group = next((g for g in groups.values() if g.gid == gid), None) + if group: + print( + f"| {group.name.ljust(13)} | {str(gid).ljust(7)} | {str(count).ljust(12)} |" + ) + if count < 20: + print( + f" Members: {', '.join(u.name + (' (locked)' if u.get_locked() else '') for u in users.values() if u.gid == gid)}" + ) + + if __name__ == "__main__": main()