Filter users/groups and produce JSON records

This commit is contained in:
2026-05-29 09:19:49 +09:00
parent 6d2a37f931
commit 81e4e7786d
2 changed files with 298 additions and 44 deletions
+3
View File
@@ -1,2 +1,5 @@
out
# Nix
result result
result-* result-*
+295 -44
View File
@@ -1,7 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from __future__ import annotations
from typing import Any
import argparse import argparse
import json import json
from collections import Counter
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@@ -13,40 +18,134 @@ def main():
"passwd_file", "passwd_file",
type=Path, type=Path,
help="Path to the passwd file.", help="Path to the passwd file.",
metavar="PASSWD_FILE",
) )
parser.add_argument( parser.add_argument(
"group_file", "group_file",
type=Path, type=Path,
help="Path to the group file.", help="Path to the group file.",
metavar="GROUP_FILE",
) )
parser.add_argument( parser.add_argument(
"shadow_file", "shadow_file",
type=Path, type=Path,
help="Path to the shadow file.", help="Path to the shadow file.",
metavar="SHADOW_FILE",
) )
parser.add_argument( parser.add_argument(
"output_dir", "--output_dir",
type=Path, type=Path,
help="Path to a directory to store json user records.", help="Path to a directory to store json user records.",
metavar="OUTPUT_DIR",
default="out",
) )
parser.add_argument(
"--email-domain",
type=str,
required=False,
metavar="DOMAIN",
)
parser.add_argument(
"--ignore-user-file",
type=Path,
required=False,
metavar="IGNORE_USER_FILE",
)
parser.add_argument(
"--ignore-group-file",
type=Path,
required=False,
metavar="IGNORE_GROUP_FILE",
)
args = parser.parse_args() args = parser.parse_args()
parsed_users = parse_passwd_file(args.passwd_file) users = parse_passwd_file(args.passwd_file)
parsed_groups = parse_group_file(args.group_file) groups = parse_group_file(args.group_file)
parsed_shadow = parse_shadow_file(args.shadow_file) shadow = parse_shadow_file(args.shadow_file)
for user in parsed_users: ignore_users_from_file(args.ignore_user_file, users)
print(json.dumps(user, indent=2)) ignore_groups_from_file(args.ignore_group_file, groups)
for group in parsed_groups: ensure_no_overlapping_uids(users)
print(json.dumps(group, indent=2)) ensure_no_overlapping_gids(groups)
for shadow_entry in parsed_shadow: if args.email_domain:
print(json.dumps(shadow_entry, indent=2)) for user in users.values():
user.email = f"{user.name}@{args.email_domain}"
if not args.output_dir.exists():
args.output_dir.mkdir(parents=True)
user_dir = args.output_dir / "users"
user_dir.mkdir(exist_ok=True)
group_dir = args.output_dir / "groups"
group_dir.mkdir(exist_ok=True)
# print_group_stats(groups, users)
for user in users.values():
user.fix_full_name()
user.set_is_admin(groups)
user.set_member_of(groups)
if user.name in shadow:
user.set_shadow_entry(shadow[user.name])
with open(args.output_dir / "users" / f"{user.name}.json", "w") as f:
json.dump(user.to_systemd_user_record(), f, indent=2)
for group in groups.values():
with open(args.output_dir / "groups" / f"{group.name}.json", "w") as f:
json.dump(group.to_systemd_group_record(), f, indent=2)
def parse_passwd_file(passwd_file: Path) -> list[dict]: def ensure_no_overlapping_uids(users: dict[str, User]):
users = [] uid_counts = Counter(user.uid for user in users.values())
overlapping_uids = {uid: count for uid, count in uid_counts.items() if count > 1}
if overlapping_uids:
print("Error: Found overlapping UIDs:")
for uid, count in overlapping_uids.items():
print(f" UID {uid} is used by {count} users")
raise ValueError("Overlapping UIDs found")
def ensure_no_overlapping_gids(groups: dict[str, Group]):
gid_counts = Counter(group.gid for group in groups.values())
overlapping_gids = {gid: count for gid, count in gid_counts.items() if count > 1}
if overlapping_gids:
print("Error: Found overlapping GIDs:")
for gid, count in overlapping_gids.items():
print(f" GID {gid} is used by {count} groups")
raise ValueError("Overlapping GIDs found")
def ignore_users_from_file(ignore_user_file: Path | None, users: dict[str, User]):
if ignore_user_file:
with ignore_user_file.open("r") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
username = line.strip()
if username in users:
del users[username]
def ignore_groups_from_file(ignore_group_file: Path | None, groups: dict[str, Group]):
if ignore_group_file:
with ignore_group_file.open("r") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
groupname = line.strip()
if groupname in groups:
del groups[groupname]
def parse_passwd_file(passwd_file: Path) -> dict[str, User]:
users = {}
with passwd_file.open("r") as f: with passwd_file.open("r") as f:
for line in f: for line in f:
if line.startswith("#") or not line.strip(): if line.startswith("#") or not line.strip():
@@ -55,31 +154,28 @@ def parse_passwd_file(passwd_file: Path) -> list[dict]:
if len(parts) < 7: if len(parts) < 7:
print(f"Warning: Skipping malformed line: {line.strip()}") print(f"Warning: Skipping malformed line: {line.strip()}")
continue continue
user = {
"name": parts[0], gecos = parts[4].split(",")
"uid": int(parts[2]),
"gid": int(parts[3]), user = User(
"gecos": parse_passwd_gecos(parts[4]), name=parts[0],
"home": parts[5], uid=int(parts[2]),
"shell": parts[6], gid=int(parts[3]),
} home=parts[5],
users.append(user) shell=parts[6],
full_name=gecos[0] if len(gecos) > 0 else None,
location=gecos[1] if len(gecos) > 1 else None,
work_phone=gecos[2] if len(gecos) > 2 else None,
home_phone=gecos[3] if len(gecos) > 3 else None,
other=gecos[4] if len(gecos) > 4 else None,
)
users[user.name] = user
return users return users
def parse_passwd_gecos(gecos: str) -> dict: def parse_group_file(group_file: Path) -> dict[str, Group]:
parts = gecos.split(",") groups = {}
return {
"full_name": parts[0] if len(parts) > 0 else "",
"room_number": parts[1] if len(parts) > 1 else "",
"work_phone": parts[2] if len(parts) > 2 else "",
"home_phone": parts[3] if len(parts) > 3 else "",
"other": parts[4] if len(parts) > 4 else "",
}
def parse_group_file(group_file: Path) -> list[dict]:
groups = []
with group_file.open("r") as f: with group_file.open("r") as f:
for line in f: for line in f:
if line.startswith("#") or not line.strip(): if line.startswith("#") or not line.strip():
@@ -88,17 +184,17 @@ def parse_group_file(group_file: Path) -> list[dict]:
if len(parts) < 4: if len(parts) < 4:
print(f"Warning: Skipping malformed line: {line.strip()}") print(f"Warning: Skipping malformed line: {line.strip()}")
continue continue
group = { group = Group(
"name": parts[0], name=parts[0],
"gid": int(parts[2]), gid=int(parts[2]),
"members": parts[3].split(",") if parts[3] else [], members=parts[3].split(",") if parts[3] else [],
} )
groups.append(group) groups[group.name] = group
return groups return groups
def parse_shadow_file(shadow_file: Path) -> list[dict]: def parse_shadow_file(shadow_file: Path) -> dict[str, dict]:
shadow_entries = [] shadow_entries = {}
with shadow_file.open("r") as f: with shadow_file.open("r") as f:
for line in f: for line in f:
if line.startswith("#") or not line.strip(): if line.startswith("#") or not line.strip():
@@ -109,7 +205,7 @@ def parse_shadow_file(shadow_file: Path) -> list[dict]:
continue continue
shadow_entry = { shadow_entry = {
"name": parts[0], "name": parts[0],
"password": parts[1], "password": parts[1] if parts[1] not in {"!", "!*", "*K*"} else None,
"last_change": int(parts[2]) if parts[2].isdigit() else None, "last_change": int(parts[2]) if parts[2].isdigit() else None,
"min_days": int(parts[3]) if parts[3].isdigit() else None, "min_days": int(parts[3]) if parts[3].isdigit() else None,
"max_days": int(parts[4]) if parts[4].isdigit() else None, "max_days": int(parts[4]) if parts[4].isdigit() else None,
@@ -118,9 +214,164 @@ def parse_shadow_file(shadow_file: Path) -> list[dict]:
"expire_date": int(parts[7]) if parts[7].isdigit() else None, "expire_date": int(parts[7]) if parts[7].isdigit() else None,
"reserved": parts[8], "reserved": parts[8],
} }
shadow_entries.append(shadow_entry) shadow_entries[shadow_entry["name"]] = shadow_entry
return shadow_entries return shadow_entries
@dataclass
class User:
name: str
uid: int
gid: int
home: str
shell: str
full_name: str | None = None
location: str | None = None
work_phone: str | None = None
home_phone: str | None = None
other: str | None = None
memberOf: list[str] | None = None
email: str | None = None
is_admin: bool = False
password: str | None = None
password_last_change: int | None = None
password_min_days: int | None = None
password_max_days: int | None = None
password_warn_days: int | None = None
password_inactive_days: int | None = None
password_expire_date: int | None = None
password_reserved: str | None = None
def fix_full_name(self):
if self.full_name is None:
return
self.full_name = (
self.full_name.strip()
.replace("{", "æ")
.replace("[", "æ")
.replace("|", "ø")
.replace("\\", "Ø")
.replace("}", "å")
.replace("]", "Å")
)
def set_is_admin(self, groups: dict[str, Group]) -> bool:
if self.gid == 0:
self.is_admin = True
if (
self.name
in groups.get("wheel", Group(name="wheel", gid=0, members=[])).members
):
self.is_admin = True
self.is_admin = False
return self.is_admin
def set_member_of(self, groups: dict[str, Group]) -> list[str]:
self.memberOf = []
for group in groups.values():
if self.name in group.members:
self.memberOf.append(group.name)
return self.memberOf
def set_shadow_entry(self, shadow_entry: dict):
self.password = shadow_entry.get("password")
self.password_last_change = shadow_entry.get("last_change")
self.password_min_days = shadow_entry.get("min_days")
self.password_max_days = shadow_entry.get("max_days")
self.password_warn_days = shadow_entry.get("warn_days")
self.password_inactive_days = shadow_entry.get("inactive_days")
self.password_expire_date = shadow_entry.get("expire_date")
self.password_reserved = shadow_entry.get("reserved")
def get_disposition(self) -> str:
if self.uid == 0:
return "intrinsic"
if self.uid < 1000:
return "system"
return "regular"
def get_locked(self) -> bool:
return self.shell in [
# PVV specific
"/bin/sperret",
"/bin/msgsh",
]
def to_systemd_user_record(self) -> dict[str, Any]:
result: dict[str, Any] = {
"userName": self.name,
"realName": self.full_name,
"disposition": self.get_disposition(),
"shell": self.shell,
"umask": "0022",
"locked": self.get_locked(),
"storage": "classic",
"uid": self.uid,
"gid": self.gid,
"homeDirectory": self.home,
# "mountNoDevices": True,
# "mountNoSuid": True,
# "mountNoExecute": False,
}
if self.email:
result["emailAddress"] = self.email
if self.location:
result["location"] = self.location
if self.memberOf:
result["memberOf"] = self.memberOf
if self.password is not None:
result["privileged"] = {
"hashedPassword": self.password,
}
return result
@dataclass
class Group:
name: str
gid: int
members: list[str]
def to_systemd_group_record(self) -> dict[str, Any]:
return {
"groupName": self.name,
"gid": self.gid,
"members": self.members,
}
def print_group_stats(
groups: dict[str, Group],
users: dict[str, User],
):
group_counter = Counter()
for user in users.values():
group_counter[user.gid] += 1
print("| Group Name | GID | Member Count |")
print("|---------------|---------|--------------|")
for gid, count in sorted(
group_counter.items(), key=lambda x: (x[1], x[0]), reverse=True
):
group = next((g for g in groups.values() if g.gid == gid), None)
if group:
print(
f"| {group.name.ljust(13)} | {str(gid).ljust(7)} | {str(count).ljust(12)} |"
)
if count < 20:
print(
f" Members: {', '.join(u.name + (' (locked)' if u.get_locked() else '') for u in users.values() if u.gid == gid)}"
)
if __name__ == "__main__": if __name__ == "__main__":
main() main()