Files
2026-05-29 22:43:06 +09:00

501 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Any
def main():
parser = argparse.ArgumentParser(
description="Convert a passwd/group/shadow file to a directory of systemd JSON user records.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-p",
"--passwd-file",
type=Path,
help="Path to the passwd file.",
metavar="PATH",
default="/etc/passwd",
)
parser.add_argument(
"-g",
"--group-file",
type=Path,
help="Path to the group file.",
metavar="PATH",
default="/etc/group",
)
parser.add_argument(
"-s",
"--shadow-file",
type=Path,
help="Path to the shadow file.",
metavar="PATH",
default="/etc/shadow",
)
parser.add_argument(
"-o",
"--output-dir",
type=Path,
help="Path to a directory to store json user records.",
metavar="OUTPUT_DIR",
default="out",
)
parser.add_argument(
"-e",
"--email-domain",
type=str,
help="Email domain to construct email addresses for users (i.e. <username>@DOMAIN).",
required=False,
metavar="DOMAIN",
)
parser.add_argument(
"-iu",
"--ignore-user-file",
type=Path,
help="A file containing usernames to ignore (one username per line).",
required=False,
metavar="PATH",
)
parser.add_argument(
"-ig",
"--ignore-group-file",
type=Path,
help="A file containing group names to ignore (one group name per line).",
required=False,
metavar="PATH",
)
parser.add_argument(
"--set-default-umask",
type=str,
help="Set the umask for all users.",
metavar='OCTAL',
)
parser.add_argument(
"--set-default-mount-no-devices",
type=bool,
help="Set mountNoDevices for all users.",
metavar='BOOL',
)
parser.add_argument(
"--set-default-mount-no-suid",
type=bool,
help="Set mountNoSuid for all users.",
metavar='BOOL',
)
parser.add_argument(
"--set-default-mount-no-execute",
type=bool,
help="Set mountNoExecute for all users.",
metavar='BOOL',
)
args = parser.parse_args()
users = parse_passwd_file(args.passwd_file)
groups = parse_group_file(args.group_file)
shadow = parse_shadow_file(args.shadow_file)
ignore_users_from_file(args.ignore_user_file, users)
ignore_groups_from_file(args.ignore_group_file, groups)
ensure_no_overlapping_uids(users)
ensure_no_overlapping_gids(groups)
if args.email_domain:
for user in users.values():
user.email = f"{user.name}@{args.email_domain}"
# print_group_stats(groups, users)
for user in users.values():
user.fix_full_name()
user.set_is_admin(groups)
user.set_member_of(groups)
if user.name in shadow:
user.set_shadow_entry(shadow[user.name])
output_to_directory(users, groups, args.output_dir)
def ensure_no_overlapping_uids(users: dict[str, User]):
uid_counts = Counter(user.uid for user in users.values())
overlapping_uids = {uid: count for uid, count in uid_counts.items() if count > 1}
if overlapping_uids:
print("Error: Found overlapping UIDs:")
for uid, count in overlapping_uids.items():
print(f" UID {uid} is used by {count} users")
raise ValueError("Overlapping UIDs found")
def ensure_no_overlapping_gids(groups: dict[str, Group]):
gid_counts = Counter(group.gid for group in groups.values())
overlapping_gids = {gid: count for gid, count in gid_counts.items() if count > 1}
if overlapping_gids:
print("Error: Found overlapping GIDs:")
for gid, count in overlapping_gids.items():
print(f" GID {gid} is used by {count} groups")
raise ValueError("Overlapping GIDs found")
def ignore_users_from_file(ignore_user_file: Path | None, users: dict[str, User]):
if ignore_user_file:
with ignore_user_file.open("r") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
username = line.strip()
if username in users:
del users[username]
def ignore_groups_from_file(ignore_group_file: Path | None, groups: dict[str, Group]):
if ignore_group_file:
with ignore_group_file.open("r") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
groupname = line.strip()
if groupname in groups:
del groups[groupname]
def parse_passwd_file(passwd_file: Path) -> dict[str, User]:
users = {}
with passwd_file.open("r") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
parts = line.strip().split(":")
if len(parts) < 7:
print(f"Warning: Skipping malformed line: {line.strip()}")
continue
gecos = parts[4].split(",")
user = User(
name=parts[0],
uid=int(parts[2]),
gid=int(parts[3]),
home=parts[5],
shell=parts[6],
full_name=gecos[0] if len(gecos) > 0 else None,
location=gecos[1] if len(gecos) > 1 else None,
work_phone=gecos[2] if len(gecos) > 2 else None,
home_phone=gecos[3] if len(gecos) > 3 else None,
other=gecos[4] if len(gecos) > 4 else None,
)
users[user.name] = user
return users
def parse_group_file(group_file: Path) -> dict[str, Group]:
groups = {}
with group_file.open("r") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
parts = line.strip().split(":")
if len(parts) < 4:
print(f"Warning: Skipping malformed line: {line.strip()}")
continue
group = Group(
name=parts[0],
gid=int(parts[2]),
members=parts[3].split(",") if parts[3] else [],
)
groups[group.name] = group
return groups
def parse_shadow_file(shadow_file: Path) -> dict[str, dict]:
shadow_entries = {}
with shadow_file.open("r") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
parts = line.strip().split(":")
if len(parts) < 9:
print(f"Warning: Skipping malformed line: {line.strip()}")
continue
shadow_entry = {
"name": parts[0],
"password": parts[1] if parts[1] not in {"!", "!*", "*K*"} else None,
"last_change": int(parts[2]) if parts[2].isdigit() else None,
"min_days": int(parts[3]) if parts[3].isdigit() else None,
"max_days": int(parts[4]) if parts[4].isdigit() else None,
"warn_days": int(parts[5]) if parts[5].isdigit() else None,
"inactive_days": int(parts[6]) if parts[6].isdigit() else None,
"expire_date": int(parts[7]) if parts[7].isdigit() else None,
"reserved": parts[8],
}
shadow_entries[shadow_entry["name"]] = shadow_entry
return shadow_entries
@dataclass
class User:
name: str
uid: int
gid: int
home: str
shell: str
# GECOS fields
full_name: str | None = None
location: str | None = None
work_phone: str | None = None
home_phone: str | None = None
other: str | None = None
# Computed fields
memberOf: list[str] | None = None
email: str | None = None
is_admin: bool = False
# Fields passed from args
umask: str | None = None
mount_no_devices: bool | None = None
mount_no_suid: bool | None = None
mount_no_execute: bool | None = None
# Privileged fields
password: str | None = None
password_last_change: int | None = None
password_min_days: int | None = None
password_max_days: int | None = None
password_warn_days: int | None = None
password_inactive_days: int | None = None
password_expire_date: int | None = None
password_reserved: str | None = None
def fix_full_name(self):
if self.full_name is None:
return
self.full_name = (
self.full_name.strip()
.replace("{", "æ")
.replace("[", "æ")
.replace("|", "ø")
.replace("\\", "Ø")
.replace("}", "å")
.replace("]", "Å")
)
def set_is_admin(self, groups: dict[str, Group]) -> bool:
if self.gid == 0:
self.is_admin = True
if (
self.name
in groups.get("wheel", Group(name="wheel", gid=0, members=[])).members
):
self.is_admin = True
self.is_admin = False
return self.is_admin
def set_member_of(self, groups: dict[str, Group]) -> list[str]:
self.memberOf = []
for group in groups.values():
if self.name in group.members:
self.memberOf.append(group.name)
return self.memberOf
def set_shadow_entry(self, shadow_entry: dict):
self.password = shadow_entry.get("password")
self.password_last_change = shadow_entry.get("last_change")
self.password_min_days = shadow_entry.get("min_days")
self.password_max_days = shadow_entry.get("max_days")
self.password_warn_days = shadow_entry.get("warn_days")
self.password_inactive_days = shadow_entry.get("inactive_days")
self.password_expire_date = shadow_entry.get("expire_date")
self.password_reserved = shadow_entry.get("reserved")
def get_disposition(self) -> str:
if self.uid == 0:
return "intrinsic"
if self.uid < 1000:
return "system"
return "regular"
def get_locked(self) -> bool:
return self.shell in [
# PVV specific
"/bin/sperret",
"/bin/msgsh",
]
def to_systemd_user_record(self) -> dict[str, Any]:
result: dict[str, Any] = {
"userName": self.name,
"realName": self.full_name,
"disposition": self.get_disposition(),
"shell": self.shell,
"locked": self.get_locked(),
"storage": "classic",
"uid": self.uid,
"gid": self.gid,
"homeDirectory": self.home,
}
if self.umask:
result["umask"] = self.umask
if self.mount_no_devices is not None:
result["mountNoDevices"] = self.mount_no_devices
if self.mount_no_suid is not None:
result["mountNoSuid"] = self.mount_no_suid
if self.mount_no_execute is not None:
result["mountNoExecute"] = self.mount_no_execute
if self.email:
result["emailAddress"] = self.email
if self.location:
result["location"] = self.location
if self.memberOf:
result["memberOf"] = self.memberOf
return result
def to_systemd_privileged_user_record(self) -> dict[str, Any] | None:
if self.password is None:
return None
return {
"hashedPassword": self.password,
}
@dataclass
class Group:
name: str
gid: int
members: list[str]
def to_systemd_group_record(self) -> dict[str, Any]:
return {
"groupName": self.name,
"gid": self.gid,
"members": self.members,
}
# TODO: parse gshadow
def to_systemd_privileged_group_record(self) -> dict[str, Any] | None:
return None
def print_group_stats(
groups: dict[str, Group],
users: dict[str, User],
):
group_counter = Counter()
for user in users.values():
group_counter[user.gid] += 1
print("| Group Name | GID | Member Count |")
print("|---------------|---------|--------------|")
for gid, count in sorted(
group_counter.items(), key=lambda x: (x[1], x[0]), reverse=True
):
group = next((g for g in groups.values() if g.gid == gid), None)
if group:
print(
f"| {group.name.ljust(13)} | {str(gid).ljust(7)} | {str(count).ljust(12)} |"
)
if count < 20:
print(
f" Members: {', '.join(u.name + (' (locked)' if u.get_locked() else '') for u in users.values() if u.gid == gid)}"
)
def output_to_directory(
users: dict[str, User],
groups: dict[str, Group],
output_dir: Path,
):
if not output_dir.exists():
output_dir.mkdir(parents=True)
for user in users.values():
user_record = user.to_systemd_user_record()
with open(output_dir / f"{user.name}.user", "w") as f:
json.dump(user_record, f, indent=2)
symlink_path = output_dir / f"{user.uid}.user"
if symlink_path.exists():
symlink_path.unlink()
symlink_path.symlink_to(f"./{user.name}.user")
privileged_user_record = user.to_systemd_privileged_user_record()
if privileged_user_record:
priv_path = output_dir / f"{user.name}.user-privileged"
priv_path.touch(exist_ok=True)
priv_path.chmod(0o600)
with open(priv_path, "w") as f:
json.dump(privileged_user_record, f, indent=2)
symlink_path = output_dir / f"{user.uid}.user-privileged"
if symlink_path.exists():
symlink_path.unlink()
symlink_path.symlink_to(f"./{user.name}.user-privileged")
for group in groups.values():
group_record = group.to_systemd_group_record()
with open(output_dir / f"{group.name}.group", "w") as f:
json.dump(group_record, f, indent=2)
symlink_path = output_dir / f"{group.gid}.group"
if symlink_path.exists():
symlink_path.unlink()
symlink_path.symlink_to(f"./{group.name}.group")
privileged_group_record = group.to_systemd_privileged_group_record()
if privileged_group_record:
priv_path = output_dir / f"{group.name}.group-privileged"
priv_path.touch(exist_ok=True)
priv_path.chmod(0o600)
with open(priv_path, "w") as f:
json.dump(privileged_group_record, f, indent=2)
symlink_path = output_dir / f"{group.gid}.group-privileged"
if symlink_path.exists():
symlink_path.unlink()
symlink_path.symlink_to(f"./{group.name}.group-privileged")
groups_by_gid = {group.gid: group for group in groups.values()}
for user in users.values():
primary_group = groups_by_gid.get(user.gid)
if not primary_group:
print(
f"Warning: User {user.name} has primary GID {user.gid} which does not correspond to any group"
)
else:
with open(
output_dir / f"{user.name}:{primary_group.name}.membership", "w"
) as f:
json.dump({}, f)
for group in groups.values():
if user.name in group.members:
with open(
output_dir / f"{user.name}:{group.name}.membership", "w"
) as f:
json.dump({}, f)
if __name__ == "__main__":
main()