import requests import secrets import os EMAIL_DOMAIN = os.getenv('EMAIL_DOMAIN') if EMAIL_DOMAIN is None: EMAIL_DOMAIN = 'pvv.ntnu.no' API_TOKEN = os.getenv('API_TOKEN') if API_TOKEN is None: raise Exception('API_TOKEN not set') GITEA_API_URL = os.getenv('GITEA_API_URL') if GITEA_API_URL is None: GITEA_API_URL = 'https://git.pvv.ntnu.no/api/v1' def gitea_list_all_users() -> dict[str, dict[str, any]] | None: r = requests.get( GITEA_API_URL + '/admin/users', headers={'Authorization': 'token ' + API_TOKEN} ) if r.status_code != 200: print('Failed to get users:', r.text) return None return {user['login']: user for user in r.json()} def gitea_create_user(username: str, userdata: dict[str, any]) -> bool: r = requests.post( GITEA_API_URL + '/admin/users', json=userdata, headers={'Authorization': 'token ' + API_TOKEN}, ) if r.status_code != 201: print(f'ERR: Failed to create user {username}:', r.text) return False return True def gitea_edit_user(username: str, userdata: dict[str, any]) -> bool: r = requests.patch( GITEA_API_URL + f'/admin/users/{username}', json=userdata, headers={'Authorization': 'token ' + API_TOKEN}, ) if r.status_code != 200: print(f'ERR: Failed to update user {username}:', r.text) return False return True def gitea_list_teams_for_organization(org: str) -> dict[str, any] | None: r = requests.get( GITEA_API_URL + f'/orgs/{org}/teams', headers={'Authorization': 'token ' + API_TOKEN}, ) if r.status_code != 200: print(f"ERR: Failed to list teams for {org}:", r.text) return None return {team['name']: team for team in r.json()} def gitea_add_user_to_organization_team(team_id: int, username: str) -> bool: r = requests.put( GITEA_API_URL + f'/teams/{team_id}/members/{username}', headers={'Authorization': 'token ' + API_TOKEN}, ) if r.status_code != 204: print(f'ERR: Failed to add user {username} to org team {team_id}:', r.text) return False return True BANNED_SHELLS = [ "/usr/bin/nologin", "/usr/sbin/nologin", "/sbin/nologin", "/bin/false", "/bin/msgsh", ] # Reads out a passwd-file line for line, and filters out # real PVV users (as opposed to system users meant for daemons and such) def passwd_file_parser(passwd_path): with open(passwd_path, 'r') as f: for line in f.readlines(): uid = int(line.split(':')[2]) if uid < 1000: continue shell = line.split(':')[-1] if shell in BANNED_SHELLS: continue username = line.split(':')[0] name = line.split(':')[4].split(',')[0] yield (username, name) def add_or_patch_gitea_user(username: str, name: str, existing_users: dict[str, dict[str, any]]) -> None: user = { "full_name": name, "username": username, "login_name": username, "source_id": 1, # 1 = SMTP } if username not in existing_users: user["password"] = secrets.token_urlsafe(32) user["must_change_password"] = False user["visibility"] = "private" user["email"] = username + '@' + EMAIL_DOMAIN if not gitea_create_user(username, user): return print('Created user', username) existing_users[username] = user else: user["visibility"] = existing_users[username]["visibility"] if not gitea_edit_user(username, user): return print('Updated user', username) def ensure_gitea_user_is_part_of_team(username: str, org: str, team_name: str) -> None: teams = gitea_list_teams_for_organization(org) if teams is None: return if team_name not in teams: print(f'ERR: could not find team "{team_name}" in organization "{org}"') gitea_add_user_to_organization_team(username, teams[team_name].id) print(f'User {username} is now part of {org}/{team_name}') COMMON_USER_TEAMS = [ ("Projects", "Members"), ("Kurs", "Members"), ] def main(): existing_users = gitea_list_all_users() if existing_users is None: exit(1) for username, name in passwd_file_parser("/tmp/passwd-import"): print(f"Processing {username}") add_or_patch_gitea_user(username, name, existing_users) for org, team_name in COMMON_USER_TEAMS: ensure_gitea_user_is_part_of_team(username, org, team_name) print() if __name__ == '__main__': main()