#!/usr/bin/env python3 from __future__ import annotations import argparse import json from collections import Counter from dataclasses import dataclass from pathlib import Path from typing import Any def main(): parser = argparse.ArgumentParser( description="Convert a passwd/group/shadow file to a directory of systemd JSON user records.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "-p", "--passwd_file", type=Path, help="Path to the passwd file.", metavar="PATH", default="/etc/passwd", ) parser.add_argument( "-g", "--group_file", type=Path, help="Path to the group file.", metavar="PATH", default="/etc/group", ) parser.add_argument( "-s", "--shadow_file", type=Path, help="Path to the shadow file.", metavar="PATH", default="/etc/shadow", ) parser.add_argument( "-o", "--output_dir", type=Path, help="Path to a directory to store json user records.", metavar="OUTPUT_DIR", default="out", ) parser.add_argument( "-e", "--email-domain", type=str, help="Email domain to construct email addresses for users (i.e. @DOMAIN).", required=False, metavar="DOMAIN", ) parser.add_argument( "-iu", "--ignore-user-file", type=Path, help="A file containing usernames to ignore (one username per line).", required=False, metavar="PATH", ) parser.add_argument( "-ig", "--ignore-group-file", type=Path, help="A file containing group names to ignore (one group name per line).", required=False, metavar="IGNORE_GROUP_FILE", ) args = parser.parse_args() users = parse_passwd_file(args.passwd_file) groups = parse_group_file(args.group_file) shadow = parse_shadow_file(args.shadow_file) ignore_users_from_file(args.ignore_user_file, users) ignore_groups_from_file(args.ignore_group_file, groups) ensure_no_overlapping_uids(users) ensure_no_overlapping_gids(groups) if args.email_domain: for user in users.values(): user.email = f"{user.name}@{args.email_domain}" # print_group_stats(groups, users) for user in users.values(): user.fix_full_name() user.set_is_admin(groups) user.set_member_of(groups) if user.name in shadow: user.set_shadow_entry(shadow[user.name]) output_to_directory(users, groups, args.output_dir) def ensure_no_overlapping_uids(users: dict[str, User]): uid_counts = Counter(user.uid for user in users.values()) overlapping_uids = {uid: count for uid, count in uid_counts.items() if count > 1} if overlapping_uids: print("Error: Found overlapping UIDs:") for uid, count in overlapping_uids.items(): print(f" UID {uid} is used by {count} users") raise ValueError("Overlapping UIDs found") def ensure_no_overlapping_gids(groups: dict[str, Group]): gid_counts = Counter(group.gid for group in groups.values()) overlapping_gids = {gid: count for gid, count in gid_counts.items() if count > 1} if overlapping_gids: print("Error: Found overlapping GIDs:") for gid, count in overlapping_gids.items(): print(f" GID {gid} is used by {count} groups") raise ValueError("Overlapping GIDs found") def ignore_users_from_file(ignore_user_file: Path | None, users: dict[str, User]): if ignore_user_file: with ignore_user_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): continue username = line.strip() if username in users: del users[username] def ignore_groups_from_file(ignore_group_file: Path | None, groups: dict[str, Group]): if ignore_group_file: with ignore_group_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): continue groupname = line.strip() if groupname in groups: del groups[groupname] def parse_passwd_file(passwd_file: Path) -> dict[str, User]: users = {} with passwd_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): continue parts = line.strip().split(":") if len(parts) < 7: print(f"Warning: Skipping malformed line: {line.strip()}") continue gecos = parts[4].split(",") user = User( name=parts[0], uid=int(parts[2]), gid=int(parts[3]), home=parts[5], shell=parts[6], full_name=gecos[0] if len(gecos) > 0 else None, location=gecos[1] if len(gecos) > 1 else None, work_phone=gecos[2] if len(gecos) > 2 else None, home_phone=gecos[3] if len(gecos) > 3 else None, other=gecos[4] if len(gecos) > 4 else None, ) users[user.name] = user return users def parse_group_file(group_file: Path) -> dict[str, Group]: groups = {} with group_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): continue parts = line.strip().split(":") if len(parts) < 4: print(f"Warning: Skipping malformed line: {line.strip()}") continue group = Group( name=parts[0], gid=int(parts[2]), members=parts[3].split(",") if parts[3] else [], ) groups[group.name] = group return groups def parse_shadow_file(shadow_file: Path) -> dict[str, dict]: shadow_entries = {} with shadow_file.open("r") as f: for line in f: if line.startswith("#") or not line.strip(): continue parts = line.strip().split(":") if len(parts) < 9: print(f"Warning: Skipping malformed line: {line.strip()}") continue shadow_entry = { "name": parts[0], "password": parts[1] if parts[1] not in {"!", "!*", "*K*"} else None, "last_change": int(parts[2]) if parts[2].isdigit() else None, "min_days": int(parts[3]) if parts[3].isdigit() else None, "max_days": int(parts[4]) if parts[4].isdigit() else None, "warn_days": int(parts[5]) if parts[5].isdigit() else None, "inactive_days": int(parts[6]) if parts[6].isdigit() else None, "expire_date": int(parts[7]) if parts[7].isdigit() else None, "reserved": parts[8], } shadow_entries[shadow_entry["name"]] = shadow_entry return shadow_entries @dataclass class User: name: str uid: int gid: int home: str shell: str full_name: str | None = None location: str | None = None work_phone: str | None = None home_phone: str | None = None other: str | None = None memberOf: list[str] | None = None email: str | None = None is_admin: bool = False password: str | None = None password_last_change: int | None = None password_min_days: int | None = None password_max_days: int | None = None password_warn_days: int | None = None password_inactive_days: int | None = None password_expire_date: int | None = None password_reserved: str | None = None def fix_full_name(self): if self.full_name is None: return self.full_name = ( self.full_name.strip() .replace("{", "æ") .replace("[", "æ") .replace("|", "ø") .replace("\\", "Ø") .replace("}", "å") .replace("]", "Å") ) def set_is_admin(self, groups: dict[str, Group]) -> bool: if self.gid == 0: self.is_admin = True if ( self.name in groups.get("wheel", Group(name="wheel", gid=0, members=[])).members ): self.is_admin = True self.is_admin = False return self.is_admin def set_member_of(self, groups: dict[str, Group]) -> list[str]: self.memberOf = [] for group in groups.values(): if self.name in group.members: self.memberOf.append(group.name) return self.memberOf def set_shadow_entry(self, shadow_entry: dict): self.password = shadow_entry.get("password") self.password_last_change = shadow_entry.get("last_change") self.password_min_days = shadow_entry.get("min_days") self.password_max_days = shadow_entry.get("max_days") self.password_warn_days = shadow_entry.get("warn_days") self.password_inactive_days = shadow_entry.get("inactive_days") self.password_expire_date = shadow_entry.get("expire_date") self.password_reserved = shadow_entry.get("reserved") def get_disposition(self) -> str: if self.uid == 0: return "intrinsic" if self.uid < 1000: return "system" return "regular" def get_locked(self) -> bool: return self.shell in [ # PVV specific "/bin/sperret", "/bin/msgsh", ] def to_systemd_user_record(self) -> dict[str, Any]: result: dict[str, Any] = { "userName": self.name, "realName": self.full_name, "disposition": self.get_disposition(), "shell": self.shell, "umask": "0022", "locked": self.get_locked(), "storage": "classic", "uid": self.uid, "gid": self.gid, "homeDirectory": self.home, # "mountNoDevices": True, # "mountNoSuid": True, # "mountNoExecute": False, } if self.email: result["emailAddress"] = self.email if self.location: result["location"] = self.location if self.memberOf: result["memberOf"] = self.memberOf return result def to_systemd_privileged_user_record(self) -> dict[str, Any] | None: if self.password is None: return None return { "hashedPassword": self.password, } @dataclass class Group: name: str gid: int members: list[str] def to_systemd_group_record(self) -> dict[str, Any]: return { "groupName": self.name, "gid": self.gid, "members": self.members, } # TODO: parse gshadow def to_systemd_privileged_group_record(self) -> dict[str, Any] | None: return None def print_group_stats( groups: dict[str, Group], users: dict[str, User], ): group_counter = Counter() for user in users.values(): group_counter[user.gid] += 1 print("| Group Name | GID | Member Count |") print("|---------------|---------|--------------|") for gid, count in sorted( group_counter.items(), key=lambda x: (x[1], x[0]), reverse=True ): group = next((g for g in groups.values() if g.gid == gid), None) if group: print( f"| {group.name.ljust(13)} | {str(gid).ljust(7)} | {str(count).ljust(12)} |" ) if count < 20: print( f" Members: {', '.join(u.name + (' (locked)' if u.get_locked() else '') for u in users.values() if u.gid == gid)}" ) def output_to_directory( users: dict[str, User], groups: dict[str, Group], output_dir: Path, ): if not output_dir.exists(): output_dir.mkdir(parents=True) for user in users.values(): user_record = user.to_systemd_user_record() with open(output_dir / f"{user.name}.user", "w") as f: json.dump(user_record, f, indent=2) symlink_path = output_dir / f"{user.uid}.user" if symlink_path.exists(): symlink_path.unlink() symlink_path.symlink_to(f"./{user.name}.user") privileged_user_record = user.to_systemd_privileged_user_record() if privileged_user_record: priv_path = output_dir / f"{user.name}.user-privileged" priv_path.touch(exist_ok=True) priv_path.chmod(0o600) with open(priv_path, "w") as f: json.dump(privileged_user_record, f, indent=2) symlink_path = output_dir / f"{user.uid}.user-privileged" if symlink_path.exists(): symlink_path.unlink() symlink_path.symlink_to(f"./{user.name}.user-privileged") for group in groups.values(): group_record = group.to_systemd_group_record() with open(output_dir / f"{group.name}.group", "w") as f: json.dump(group_record, f, indent=2) symlink_path = output_dir / f"{group.gid}.group" if symlink_path.exists(): symlink_path.unlink() symlink_path.symlink_to(f"./{group.name}.group") privileged_group_record = group.to_systemd_privileged_group_record() if privileged_group_record: priv_path = output_dir / f"{group.name}.group-privileged" priv_path.touch(exist_ok=True) priv_path.chmod(0o600) with open(priv_path, "w") as f: json.dump(privileged_group_record, f, indent=2) symlink_path = output_dir / f"{group.gid}.group-privileged" if symlink_path.exists(): symlink_path.unlink() symlink_path.symlink_to(f"./{group.name}.group-privileged") groups_by_gid = {group.gid: group for group in groups.values()} for user in users.values(): primary_group = groups_by_gid.get(user.gid) if not primary_group: print( f"Warning: User {user.name} has primary GID {user.gid} which does not correspond to any group" ) else: with open( output_dir / f"{user.name}:{primary_group.name}.membership", "w" ) as f: json.dump({}, f) for group in groups.values(): if user.name in group.members: with open( output_dir / f"{user.name}:{group.name}.membership", "w" ) as f: json.dump({}, f) if __name__ == "__main__": main()