From 864a4636f38d33f66943a7f672a9c40900a78884 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Barto=C5=A1?= <bartos@cesnet.cz> Date: Mon, 18 Apr 2022 17:12:08 +0200 Subject: [PATCH] possibility to add and delete users --- main.py | 121 +++++++++++++++++++++++++++++++++----------- templates/main.html | 4 +- 2 files changed, 94 insertions(+), 31 deletions(-) diff --git a/main.py b/main.py index 30d3276..93083ec 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ from datetime import datetime import os.path import re import subprocess +from typing import List,Dict from flask import Flask, render_template, request, make_response, redirect, flash from flask_wtf import FlaskForm @@ -25,13 +26,14 @@ KEYCLOAK_ADMIN_PASSWORD_FILE = os.path.join(SOCTOOLS_BASE, "secrets/passwords/ke @app.before_first_request def load_config(): """Load various variables, api keys, etc. and set configuration parameters""" - global SOCTOOLSPROXY, KEYCLOAK_BASE_URL, KEYCLOAK_ADMIN_PASSWORD + global SOCTOOLSPROXY, KEYCLOAK_BASE_URL, KEYCLOAK_USERS_URL, KEYCLOAK_ADMIN_PASSWORD variables = yaml.safe_load(open(VARIABLES_FILE, "r")) # Get FQDN of the main server SOCTOOLSPROXY = variables["soctoolsproxy"] assert re.match('[a-zA-Z0-9.-]+', SOCTOOLSPROXY), f"ERROR: The 'soctoolsproxy' variable loaded from '{VARIABLES_FILE}' is not a valid domain name." # Set base URL to Keycloak KEYCLOAK_BASE_URL = f"https://{SOCTOOLSPROXY}:12443" + KEYCLOAK_USERS_URL = KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users" # Load API key for Keycloak KEYCLOAK_ADMIN_PASSWORD = open(KEYCLOAK_ADMIN_PASSWORD_FILE, "r").read(100).strip() # read max 100 B, the key should never be so long print(f"Config loaded:\nSOCTOOLSPROXY={SOCTOOLSPROXY}\nKEYCLOAK_BASE_URL={KEYCLOAK_BASE_URL}\n" @@ -39,7 +41,7 @@ def load_config(): # *** Custom Jinja filters *** -def ts_to_str(ts): +def ts_to_str(ts: float) -> str: return datetime.utcfromtimestamp(int(ts)).isoformat(sep=" ") # TODO Do Keycloak really use UTC timestamps? app.jinja_env.filters["ts_to_str"] = ts_to_str @@ -47,8 +49,15 @@ app.jinja_env.filters["ts_to_str"] = ts_to_str # *** Functions to call other APIs *** -def get_token(): - """Get admin's OIDC token from Keycloak - needed to perform any administrative API call""" +class KeycloakError(Exception): + pass + +def kc_get_token() -> str: + """ + Get admin's OIDC token from Keycloak - needed to perform any administrative API call + + Return the token or raise KeycloakError + """ url = KEYCLOAK_BASE_URL + "/auth/realms/master/protocol/openid-connect/token" data = { "client_id": "admin-cli", @@ -59,32 +68,70 @@ def get_token(): try: resp = requests.post(url, data, verify=CA_CERT_FILE) if resp.status_code != 200: - flash(f"ERROR: Can't get token for API access: ({resp.status_code}) {resp.text[:200]}", "error") - return None + raise KeycloakError(f"Can't get OIDC token for API access: ({resp.status_code}) {resp.text[:200]}") return str(resp.json()['access_token']) except Exception as e: - flash(f"ERROR: Can't get token for API access: {type(e).__name__}: {e}", "error") - return None - -def get_users(): - # Get list of users from Keycloak - url = KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users" - token = get_token() - if token is None: - return [] # can't get token, error message is already flashed by get_token function - resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) + raise KeycloakError(f"Can't get OIDC token for API access: {type(e).__name__}: {e}") + +def kc_get_users() -> List[Dict]: + """ + Get list of users from Keycloak + + :return List of dicts, one per user, with keys matching the Keycloak user representation: + https://www.keycloak.org/docs-api/12.0/rest-api/#_userrepresentation + :raise KeycloakError + """ + token = kc_get_token() + resp = requests.get(KEYCLOAK_USERS_URL, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: - flash(f"ERROR: Can't get list of users: ({resp.status_code}) {resp.text[:200]}", "error") - return [] + raise KeycloakError("Can't get list of users: ({resp.status_code}) {resp.text[:200]}") try: users = resp.json() assert isinstance(users, list) and all(isinstance(o, dict) for o in users), "" except (ValueError, AssertionError): - flash(f"ERROR: Can't get list of users: Unexpected content of response from Keycloak", "error") - return [] + raise KeycloakError(f"Can't get list of users: Unexpected content of response from Keycloak") return users +def kc_add_user(username: str, firstname: str, lastname: str, cn: str, email: str) -> None: + """Add a new user to Keycloak + + :return None + :raises KeycloakError + """ + token = kc_get_token() + + user_data = { + "username": username, + "firstName": firstname, + "lastName": lastname, + "email": email, + "attributes": { + "CN": [cn], + "DN": [f"CN={cn}"] + }, + "enabled": True # user must be explicitly enabled, default is False + } + resp = requests.post(KEYCLOAK_USERS_URL, json=user_data, + headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) + if not resp.ok: + raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") + + +def kc_delete_user(userid: str) -> None: + """Delete a user from Keycloak + + :param userid: Keycloak user ID (not username) + :return None + :raise KeycloakError + """ + token = kc_get_token() + url = KEYCLOAK_USERS_URL + "/" + userid + resp = requests.delete(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) + if not resp.ok: + raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") + + # *** Flask endpoints and forms *** class AddUserForm(FlaskForm): @@ -98,25 +145,39 @@ class AddUserForm(FlaskForm): @app.route("/", methods=["GET", "POST"]) def main(): - # Load existing users from Keycloak - users = get_users() - #print(users) - # Add user form form_add_user = AddUserForm() if form_add_user.validate_on_submit(): # TODO check that username doesn't exist, yet (and check validity, i.e. special characters etc.) - # TODO add user - result = subprocess.run(["echo", "test"]) - if result.returncode == 0: + # Add user + try: + kc_add_user(form_add_user.username.data, form_add_user.firstname.data, form_add_user.lastname.data, + form_add_user.cn.data, form_add_user.email.data) flash(f'User "{form_add_user.username.data}" successfully created.', "success") - else: - flash(f'Error when creating user: {result.stderr}', "error") + return redirect("/") # Force new load of the page using GET, so page refresh doesn't trigger new POST. + except KeycloakError as e: + flash(f'Error when creating user: {e}', "error") + + # Load existing users from Keycloak + try: + users = kc_get_users() + except KeycloakError as e: + flash(f"ERROR: {e}", "error") + users = [] + #print(users) return render_template("main.html", **locals()) -# TODO AJAX endpoint to delete user +@app.route("/delete_user/<userid>") +def delete_user(userid: str): + """Delete user given by userid and redirect back to main page""" + try: + kc_delete_user(userid) + flash(f'User successfully deleted.', "success") + except KeycloakError as e: + flash(f'Error when deleting user: {e}', "error") + return redirect("/") # TODO edit user? User detail page? diff --git a/templates/main.html b/templates/main.html index bf3b73e..61e66c6 100644 --- a/templates/main.html +++ b/templates/main.html @@ -31,8 +31,10 @@ <td>{{ user.attributes.CN[0] }}</td> <td>{{ user.attributes.DN[0] }}</td> <td>{{ (user.createdTimestamp/1000)|ts_to_str }}</td> -<td>...</td> +<td><a href="{{ url_for('delete_user', userid=user.id) }}" title="Delete user" + onclick="return confirm('Are you sure you want to permanently delete user account "{{user.username}}" ({{user.attributes.CN[0]}}, {{user.email}})?')">🗑</a></td> </tr> +{#<tr><td colspan=8>{{ user }}</td></tr>#} {% endfor %} </table> -- GitLab