From e09d1cc3e13eaa2e4f34dd8c9e4208f9bbdaa03c Mon Sep 17 00:00:00 2001 From: h7x4 Date: Mon, 26 Aug 2024 21:01:40 +0200 Subject: [PATCH] bekkalokk/gitea/import-users: refactor + add members to groups --- .../gitea/import-users/gitea-import-users.py | 162 +++++++++++++----- 1 file changed, 123 insertions(+), 39 deletions(-) diff --git a/hosts/bekkalokk/services/gitea/import-users/gitea-import-users.py b/hosts/bekkalokk/services/gitea/import-users/gitea-import-users.py index 7211c64..f13eee3 100644 --- a/hosts/bekkalokk/services/gitea/import-users/gitea-import-users.py +++ b/hosts/bekkalokk/services/gitea/import-users/gitea-import-users.py @@ -2,18 +2,89 @@ import requests import secrets import os + EMAIL_DOMAIN = os.getenv('EMAIL_DOMAIN') if EMAIL_DOMAIN is None: EMAIL_DOMAIN = 'pvv.ntnu.no' + API_TOKEN = os.getenv('API_TOKEN') if API_TOKEN is None: raise Exception('API_TOKEN not set') + GITEA_API_URL = os.getenv('GITEA_API_URL') if GITEA_API_URL is None: GITEA_API_URL = 'https://git.pvv.ntnu.no/api/v1' + +def gitea_list_all_users() -> dict[str, dict[str, any]] | None: + r = requests.get( + GITEA_API_URL + '/admin/users', + headers={'Authorization': 'token ' + API_TOKEN} + ) + + if r.status_code != 200: + print('Failed to get users:', r.text) + return None + + return {user['login'] : user for user in r.json()} + + +def gitea_create_user(username: str, userdata: dict[str,any]) -> bool: + r = requests.post( + GITEA_API_URL + '/admin/users', + json=userdata, + headers={'Authorization': 'token ' + API_TOKEN}, + ) + + if r.status_code != 201: + print(f'ERR: Failed to create user {username}:', r.text) + return False + + return True + + +def gitea_edit_user(username: str, userdata: dict[str,any]) -> bool: + r = requests.patch( + GITEA_API_URL + f'/admin/users/{username}', + json=userdata, + headers={'Authorization': 'token ' + API_TOKEN}, + ) + + if r.status_code != 200: + print(f'ERR: Failed to update user {username}:', r.text) + return False + + return True + + +def gitea_list_teams_for_organization(org: str) -> dict[str, any] | None: + r = requests.get( + GITEA_API_URL + f'/orgs/{org}/teams', + headers={'Authorization': 'token ' + API_TOKEN}, + ) + + if r.status_code != 200: + print(f"ERR: Failed to list teams for {org}:", r.text) + return None + + return {team['name'] : team for team in r.json()} + + +def gitea_add_user_to_organization_team(team_id: int, username: str) -> bool: + r = requests.put( + GITEA_API_URL + f'/teams/{team_id}/members/{username}', + headers={'Authorization': 'token ' + API_TOKEN}, + ) + + if r.status_code != 204: + print(f'ERR: Failed to add user {username} to org team {team_id}:', r.text) + return False + + return True + + BANNED_SHELLS = [ "/usr/bin/nologin", "/usr/sbin/nologin", @@ -22,12 +93,26 @@ BANNED_SHELLS = [ "/bin/msgsh", ] -existing_users = {} + +# Reads out a passwd-file line for line, and filters out +# real PVV users (as opposed to system users meant for daemons and such) +def passwd_file_parser(passwd_path): + with open(passwd_path, 'r') as f: + for line in f.readlines(): + uid = int(line.split(':')[2]) + if uid < 1000: + continue + + shell = line.split(':')[-1] + if shell in BANNED_SHELLS: + continue + + username = line.split(':')[0] + name = line.split(':')[4].split(',')[0] + yield (username, name) -# This function should only ever be called when adding users -# from the passwd file -def add_user(username, name): +def add_or_patch_gitea_user(username: str, name: str, existing_users: dict[str, dict[str, any]]) -> None: user = { "full_name": name, "username": username, @@ -41,53 +126,52 @@ def add_user(username, name): user["visibility"] = "private" user["email"] = username + '@' + EMAIL_DOMAIN - r = requests.post(GITEA_API_URL + '/admin/users', json=user, - headers={'Authorization': 'token ' + API_TOKEN}) - if r.status_code != 201: - print('ERR: Failed to create user ' + username + ': ' + r.text) + if not gitea_create_user(username, user): return - print('Created user ' + username) + print('Created user', username) existing_users[username] = user else: user["visibility"] = existing_users[username]["visibility"] - r = requests.patch(GITEA_API_URL + f'/admin/users/{username}', - json=user, - headers={'Authorization': 'token ' + API_TOKEN}) - if r.status_code != 200: - print('ERR: Failed to update user ' + username + ': ' + r.text) + + if not gitea_edit_user(username, user): return - print('Updated user ' + username) + print('Updated user', username) + + +def ensure_gitea_user_is_part_of_team(username: str, org: str, team_name: str) -> None: + teams = gitea_list_teams_for_organization(org) + + if teams is None: + return + + if team_name not in teams: + print(f'ERR: could not find team "{team_name}" in organization "{org}"') + + gitea_add_user_to_organization_team(username, teams[team_name].id) + + print(f'User {username} is now part of {org}/{team_name}') + + +COMMON_USER_TEAMS = [ + ("Projects", "Members"), + ("Kurs", "Members"), +] def main(): - # Fetch existing users - r = requests.get(GITEA_API_URL + '/admin/users', - headers={'Authorization': 'token ' + API_TOKEN}) + existing_users = gitea_list_all_users() + if existing_users is None: + exit(1) - if r.status_code != 200: - raise Exception('Failed to get users: ' + r.text) - - for user in r.json(): - existing_users[user['login']] = user - - # Read the file, add each user - with open("/tmp/passwd-import", 'r') as f: - for line in f.readlines(): - uid = int(line.split(':')[2]) - if uid < 1000: - continue - - shell = line.split(':')[-1] - if shell in BANNED_SHELLS: - continue - - username = line.split(':')[0] - name = line.split(':')[4].split(',')[0] - - add_user(username, name) + for username, name in passwd_file_parser("/tmp/passwd-import"): + print(f"Processing {username}") + add_or_patch_gitea_user(username, name, existing_users) + for org, team_name in COMMON_USER_TEAMS: + ensure_gitea_user_is_part_of_team(username, org, team_name) + print() if __name__ == '__main__':