501 lines
16 KiB
Python
Executable File
501 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Convert a passwd/group/shadow file to a directory of systemd JSON user records.",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--passwd-file",
|
|
type=Path,
|
|
help="Path to the passwd file.",
|
|
metavar="PATH",
|
|
default="/etc/passwd",
|
|
)
|
|
parser.add_argument(
|
|
"-g",
|
|
"--group-file",
|
|
type=Path,
|
|
help="Path to the group file.",
|
|
metavar="PATH",
|
|
default="/etc/group",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--shadow-file",
|
|
type=Path,
|
|
help="Path to the shadow file.",
|
|
metavar="PATH",
|
|
default="/etc/shadow",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output-dir",
|
|
type=Path,
|
|
help="Path to a directory to store json user records.",
|
|
metavar="OUTPUT_DIR",
|
|
default="out",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-e",
|
|
"--email-domain",
|
|
type=str,
|
|
help="Email domain to construct email addresses for users (i.e. <username>@DOMAIN).",
|
|
required=False,
|
|
metavar="DOMAIN",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-iu",
|
|
"--ignore-user-file",
|
|
type=Path,
|
|
help="A file containing usernames to ignore (one username per line).",
|
|
required=False,
|
|
metavar="PATH",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-ig",
|
|
"--ignore-group-file",
|
|
type=Path,
|
|
help="A file containing group names to ignore (one group name per line).",
|
|
required=False,
|
|
metavar="PATH",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--set-default-umask",
|
|
type=str,
|
|
help="Set the umask for all users.",
|
|
metavar='OCTAL',
|
|
)
|
|
parser.add_argument(
|
|
"--set-default-mount-no-devices",
|
|
type=bool,
|
|
help="Set mountNoDevices for all users.",
|
|
metavar='BOOL',
|
|
)
|
|
parser.add_argument(
|
|
"--set-default-mount-no-suid",
|
|
type=bool,
|
|
help="Set mountNoSuid for all users.",
|
|
metavar='BOOL',
|
|
)
|
|
parser.add_argument(
|
|
"--set-default-mount-no-execute",
|
|
type=bool,
|
|
help="Set mountNoExecute for all users.",
|
|
metavar='BOOL',
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
users = parse_passwd_file(args.passwd_file)
|
|
groups = parse_group_file(args.group_file)
|
|
shadow = parse_shadow_file(args.shadow_file)
|
|
|
|
ignore_users_from_file(args.ignore_user_file, users)
|
|
ignore_groups_from_file(args.ignore_group_file, groups)
|
|
|
|
ensure_no_overlapping_uids(users)
|
|
ensure_no_overlapping_gids(groups)
|
|
|
|
if args.email_domain:
|
|
for user in users.values():
|
|
user.email = f"{user.name}@{args.email_domain}"
|
|
|
|
# print_group_stats(groups, users)
|
|
|
|
for user in users.values():
|
|
user.fix_full_name()
|
|
user.set_is_admin(groups)
|
|
user.set_member_of(groups)
|
|
if user.name in shadow:
|
|
user.set_shadow_entry(shadow[user.name])
|
|
|
|
output_to_directory(users, groups, args.output_dir)
|
|
|
|
|
|
def ensure_no_overlapping_uids(users: dict[str, User]):
|
|
uid_counts = Counter(user.uid for user in users.values())
|
|
overlapping_uids = {uid: count for uid, count in uid_counts.items() if count > 1}
|
|
if overlapping_uids:
|
|
print("Error: Found overlapping UIDs:")
|
|
for uid, count in overlapping_uids.items():
|
|
print(f" UID {uid} is used by {count} users")
|
|
raise ValueError("Overlapping UIDs found")
|
|
|
|
|
|
def ensure_no_overlapping_gids(groups: dict[str, Group]):
|
|
gid_counts = Counter(group.gid for group in groups.values())
|
|
overlapping_gids = {gid: count for gid, count in gid_counts.items() if count > 1}
|
|
if overlapping_gids:
|
|
print("Error: Found overlapping GIDs:")
|
|
for gid, count in overlapping_gids.items():
|
|
print(f" GID {gid} is used by {count} groups")
|
|
raise ValueError("Overlapping GIDs found")
|
|
|
|
|
|
def ignore_users_from_file(ignore_user_file: Path | None, users: dict[str, User]):
|
|
if ignore_user_file:
|
|
with ignore_user_file.open("r") as f:
|
|
for line in f:
|
|
if line.startswith("#") or not line.strip():
|
|
continue
|
|
username = line.strip()
|
|
if username in users:
|
|
del users[username]
|
|
|
|
|
|
def ignore_groups_from_file(ignore_group_file: Path | None, groups: dict[str, Group]):
|
|
if ignore_group_file:
|
|
with ignore_group_file.open("r") as f:
|
|
for line in f:
|
|
if line.startswith("#") or not line.strip():
|
|
continue
|
|
groupname = line.strip()
|
|
if groupname in groups:
|
|
del groups[groupname]
|
|
|
|
|
|
def parse_passwd_file(passwd_file: Path) -> dict[str, User]:
|
|
users = {}
|
|
with passwd_file.open("r") as f:
|
|
for line in f:
|
|
if line.startswith("#") or not line.strip():
|
|
continue
|
|
parts = line.strip().split(":")
|
|
if len(parts) < 7:
|
|
print(f"Warning: Skipping malformed line: {line.strip()}")
|
|
continue
|
|
|
|
gecos = parts[4].split(",")
|
|
|
|
user = User(
|
|
name=parts[0],
|
|
uid=int(parts[2]),
|
|
gid=int(parts[3]),
|
|
home=parts[5],
|
|
shell=parts[6],
|
|
full_name=gecos[0] if len(gecos) > 0 else None,
|
|
location=gecos[1] if len(gecos) > 1 else None,
|
|
work_phone=gecos[2] if len(gecos) > 2 else None,
|
|
home_phone=gecos[3] if len(gecos) > 3 else None,
|
|
other=gecos[4] if len(gecos) > 4 else None,
|
|
)
|
|
|
|
users[user.name] = user
|
|
return users
|
|
|
|
|
|
def parse_group_file(group_file: Path) -> dict[str, Group]:
|
|
groups = {}
|
|
with group_file.open("r") as f:
|
|
for line in f:
|
|
if line.startswith("#") or not line.strip():
|
|
continue
|
|
parts = line.strip().split(":")
|
|
if len(parts) < 4:
|
|
print(f"Warning: Skipping malformed line: {line.strip()}")
|
|
continue
|
|
group = Group(
|
|
name=parts[0],
|
|
gid=int(parts[2]),
|
|
members=parts[3].split(",") if parts[3] else [],
|
|
)
|
|
groups[group.name] = group
|
|
return groups
|
|
|
|
|
|
def parse_shadow_file(shadow_file: Path) -> dict[str, dict]:
|
|
shadow_entries = {}
|
|
with shadow_file.open("r") as f:
|
|
for line in f:
|
|
if line.startswith("#") or not line.strip():
|
|
continue
|
|
parts = line.strip().split(":")
|
|
if len(parts) < 9:
|
|
print(f"Warning: Skipping malformed line: {line.strip()}")
|
|
continue
|
|
shadow_entry = {
|
|
"name": parts[0],
|
|
"password": parts[1] if parts[1] not in {"!", "!*", "*K*"} else None,
|
|
"last_change": int(parts[2]) if parts[2].isdigit() else None,
|
|
"min_days": int(parts[3]) if parts[3].isdigit() else None,
|
|
"max_days": int(parts[4]) if parts[4].isdigit() else None,
|
|
"warn_days": int(parts[5]) if parts[5].isdigit() else None,
|
|
"inactive_days": int(parts[6]) if parts[6].isdigit() else None,
|
|
"expire_date": int(parts[7]) if parts[7].isdigit() else None,
|
|
"reserved": parts[8],
|
|
}
|
|
shadow_entries[shadow_entry["name"]] = shadow_entry
|
|
return shadow_entries
|
|
|
|
|
|
@dataclass
|
|
class User:
|
|
name: str
|
|
uid: int
|
|
gid: int
|
|
home: str
|
|
shell: str
|
|
|
|
# GECOS fields
|
|
full_name: str | None = None
|
|
location: str | None = None
|
|
work_phone: str | None = None
|
|
home_phone: str | None = None
|
|
other: str | None = None
|
|
|
|
# Computed fields
|
|
memberOf: list[str] | None = None
|
|
email: str | None = None
|
|
is_admin: bool = False
|
|
|
|
# Fields passed from args
|
|
umask: str | None = None
|
|
mount_no_devices: bool | None = None
|
|
mount_no_suid: bool | None = None
|
|
mount_no_execute: bool | None = None
|
|
|
|
# Privileged fields
|
|
password: str | None = None
|
|
password_last_change: int | None = None
|
|
password_min_days: int | None = None
|
|
password_max_days: int | None = None
|
|
password_warn_days: int | None = None
|
|
password_inactive_days: int | None = None
|
|
password_expire_date: int | None = None
|
|
password_reserved: str | None = None
|
|
|
|
def fix_full_name(self):
|
|
if self.full_name is None:
|
|
return
|
|
self.full_name = (
|
|
self.full_name.strip()
|
|
.replace("{", "æ")
|
|
.replace("[", "æ")
|
|
.replace("|", "ø")
|
|
.replace("\\", "Ø")
|
|
.replace("}", "å")
|
|
.replace("]", "Å")
|
|
)
|
|
|
|
def set_is_admin(self, groups: dict[str, Group]) -> bool:
|
|
if self.gid == 0:
|
|
self.is_admin = True
|
|
|
|
if (
|
|
self.name
|
|
in groups.get("wheel", Group(name="wheel", gid=0, members=[])).members
|
|
):
|
|
self.is_admin = True
|
|
|
|
self.is_admin = False
|
|
|
|
return self.is_admin
|
|
|
|
def set_member_of(self, groups: dict[str, Group]) -> list[str]:
|
|
self.memberOf = []
|
|
for group in groups.values():
|
|
if self.name in group.members:
|
|
self.memberOf.append(group.name)
|
|
return self.memberOf
|
|
|
|
def set_shadow_entry(self, shadow_entry: dict):
|
|
self.password = shadow_entry.get("password")
|
|
self.password_last_change = shadow_entry.get("last_change")
|
|
self.password_min_days = shadow_entry.get("min_days")
|
|
self.password_max_days = shadow_entry.get("max_days")
|
|
self.password_warn_days = shadow_entry.get("warn_days")
|
|
self.password_inactive_days = shadow_entry.get("inactive_days")
|
|
self.password_expire_date = shadow_entry.get("expire_date")
|
|
self.password_reserved = shadow_entry.get("reserved")
|
|
|
|
def get_disposition(self) -> str:
|
|
if self.uid == 0:
|
|
return "intrinsic"
|
|
if self.uid < 1000:
|
|
return "system"
|
|
return "regular"
|
|
|
|
def get_locked(self) -> bool:
|
|
return self.shell in [
|
|
# PVV specific
|
|
"/bin/sperret",
|
|
"/bin/msgsh",
|
|
]
|
|
|
|
def to_systemd_user_record(self) -> dict[str, Any]:
|
|
result: dict[str, Any] = {
|
|
"userName": self.name,
|
|
"realName": self.full_name,
|
|
"disposition": self.get_disposition(),
|
|
"shell": self.shell,
|
|
"locked": self.get_locked(),
|
|
"storage": "classic",
|
|
"uid": self.uid,
|
|
"gid": self.gid,
|
|
"homeDirectory": self.home,
|
|
}
|
|
|
|
if self.umask:
|
|
result["umask"] = self.umask
|
|
if self.mount_no_devices is not None:
|
|
result["mountNoDevices"] = self.mount_no_devices
|
|
if self.mount_no_suid is not None:
|
|
result["mountNoSuid"] = self.mount_no_suid
|
|
if self.mount_no_execute is not None:
|
|
result["mountNoExecute"] = self.mount_no_execute
|
|
|
|
if self.email:
|
|
result["emailAddress"] = self.email
|
|
if self.location:
|
|
result["location"] = self.location
|
|
if self.memberOf:
|
|
result["memberOf"] = self.memberOf
|
|
|
|
return result
|
|
|
|
def to_systemd_privileged_user_record(self) -> dict[str, Any] | None:
|
|
if self.password is None:
|
|
return None
|
|
|
|
return {
|
|
"hashedPassword": self.password,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Group:
|
|
name: str
|
|
gid: int
|
|
members: list[str]
|
|
|
|
def to_systemd_group_record(self) -> dict[str, Any]:
|
|
return {
|
|
"groupName": self.name,
|
|
"gid": self.gid,
|
|
"members": self.members,
|
|
}
|
|
|
|
# TODO: parse gshadow
|
|
def to_systemd_privileged_group_record(self) -> dict[str, Any] | None:
|
|
return None
|
|
|
|
|
|
def print_group_stats(
|
|
groups: dict[str, Group],
|
|
users: dict[str, User],
|
|
):
|
|
group_counter = Counter()
|
|
for user in users.values():
|
|
group_counter[user.gid] += 1
|
|
|
|
print("| Group Name | GID | Member Count |")
|
|
print("|---------------|---------|--------------|")
|
|
for gid, count in sorted(
|
|
group_counter.items(), key=lambda x: (x[1], x[0]), reverse=True
|
|
):
|
|
group = next((g for g in groups.values() if g.gid == gid), None)
|
|
if group:
|
|
print(
|
|
f"| {group.name.ljust(13)} | {str(gid).ljust(7)} | {str(count).ljust(12)} |"
|
|
)
|
|
if count < 20:
|
|
print(
|
|
f" Members: {', '.join(u.name + (' (locked)' if u.get_locked() else '') for u in users.values() if u.gid == gid)}"
|
|
)
|
|
|
|
|
|
def output_to_directory(
|
|
users: dict[str, User],
|
|
groups: dict[str, Group],
|
|
output_dir: Path,
|
|
):
|
|
if not output_dir.exists():
|
|
output_dir.mkdir(parents=True)
|
|
|
|
for user in users.values():
|
|
user_record = user.to_systemd_user_record()
|
|
with open(output_dir / f"{user.name}.user", "w") as f:
|
|
json.dump(user_record, f, indent=2)
|
|
|
|
symlink_path = output_dir / f"{user.uid}.user"
|
|
if symlink_path.exists():
|
|
symlink_path.unlink()
|
|
symlink_path.symlink_to(f"./{user.name}.user")
|
|
|
|
privileged_user_record = user.to_systemd_privileged_user_record()
|
|
if privileged_user_record:
|
|
priv_path = output_dir / f"{user.name}.user-privileged"
|
|
priv_path.touch(exist_ok=True)
|
|
priv_path.chmod(0o600)
|
|
with open(priv_path, "w") as f:
|
|
json.dump(privileged_user_record, f, indent=2)
|
|
|
|
symlink_path = output_dir / f"{user.uid}.user-privileged"
|
|
if symlink_path.exists():
|
|
symlink_path.unlink()
|
|
symlink_path.symlink_to(f"./{user.name}.user-privileged")
|
|
|
|
for group in groups.values():
|
|
group_record = group.to_systemd_group_record()
|
|
with open(output_dir / f"{group.name}.group", "w") as f:
|
|
json.dump(group_record, f, indent=2)
|
|
|
|
symlink_path = output_dir / f"{group.gid}.group"
|
|
if symlink_path.exists():
|
|
symlink_path.unlink()
|
|
symlink_path.symlink_to(f"./{group.name}.group")
|
|
|
|
privileged_group_record = group.to_systemd_privileged_group_record()
|
|
if privileged_group_record:
|
|
priv_path = output_dir / f"{group.name}.group-privileged"
|
|
priv_path.touch(exist_ok=True)
|
|
priv_path.chmod(0o600)
|
|
with open(priv_path, "w") as f:
|
|
json.dump(privileged_group_record, f, indent=2)
|
|
|
|
symlink_path = output_dir / f"{group.gid}.group-privileged"
|
|
if symlink_path.exists():
|
|
symlink_path.unlink()
|
|
symlink_path.symlink_to(f"./{group.name}.group-privileged")
|
|
|
|
groups_by_gid = {group.gid: group for group in groups.values()}
|
|
for user in users.values():
|
|
primary_group = groups_by_gid.get(user.gid)
|
|
if not primary_group:
|
|
print(
|
|
f"Warning: User {user.name} has primary GID {user.gid} which does not correspond to any group"
|
|
)
|
|
else:
|
|
with open(
|
|
output_dir / f"{user.name}:{primary_group.name}.membership", "w"
|
|
) as f:
|
|
json.dump({}, f)
|
|
|
|
for group in groups.values():
|
|
if user.name in group.members:
|
|
with open(
|
|
output_dir / f"{user.name}:{group.name}.membership", "w"
|
|
) as f:
|
|
json.dump({}, f)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|