From a95e497f74231ddae06a30d2c57236b1935fe7ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Barto=C5=A1?= <bartos@cesnet.cz> Date: Mon, 18 Apr 2022 21:55:33 +0200 Subject: [PATCH] Allow to edit user (plus some more changes) - "Add user" moved to separate page - Added a UserAccount class as our own representation of user account information --- main.py | 180 ++++++++++++++++++++++++++++------- requirements.txt | 1 + static/style.css | 5 + templates/add_edit_user.html | 30 ++++++ templates/base.html | 25 +++++ templates/main.html | 62 +++--------- 6 files changed, 221 insertions(+), 82 deletions(-) create mode 100644 templates/add_edit_user.html create mode 100644 templates/base.html diff --git a/main.py b/main.py index 93083ec..03b4914 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 import sys -from datetime import datetime +from datetime import datetime, timezone import os.path import re import subprocess -from typing import List,Dict +from typing import List, Dict, Optional from flask import Flask, render_template, request, make_response, redirect, flash from flask_wtf import FlaskForm @@ -13,6 +13,7 @@ from wtforms.validators import DataRequired, Email import requests import yaml +from dataclasses import dataclass, field # external package - backport of Py3.7 dataclasses to Py3.6 app = Flask(__name__) app.secret_key = "ASDF1234 - CHANGE ME!" @@ -47,6 +48,48 @@ def ts_to_str(ts: float) -> str: app.jinja_env.filters["ts_to_str"] = ts_to_str +# *** Our user representation *** + +@dataclass +class UserAccount: + username: str + email: str + firstname: str + lastname: str + cn: str + dn: str + kcid: Optional[str] = field(default=None) # keycloak ID + ts_created: Optional[datetime] = field(default=None) # timezone-aware datetime in UTC + + def to_keycloak_representation(self) -> Dict: + """ + Create Keycloak representation of user account information + + Ref: https://www.keycloak.org/docs-api/12.0/rest-api/#_userrepresentation + """ + return { + "id": self.kcid, + "username": self.username, + "firstName": self.firstname, + "lastName": self.lastname, + "email": self.email, + "attributes": { + "CN": [self.cn], + "DN": [f"CN={self.cn}"] + }, + } + + @classmethod + def from_keycloak_representation(cls, kc_user: dict) -> "UserAccount": + try: + return cls(kc_user['username'], kc_user['email'], kc_user['firstName'], kc_user['lastName'], + kc_user['attributes'].get('CN',[''])[0], kc_user['attributes'].get('DN',[''])[0], + kc_user['id'], + datetime.utcfromtimestamp(int(kc_user['createdTimestamp']/1000)).replace(tzinfo=timezone.utc)) + except KeyError as e: + raise KeycloakError(f"User representation received from Keycloak is missing attribute '{e}'") + + # *** Functions to call other APIs *** class KeycloakError(Exception): @@ -73,27 +116,47 @@ def kc_get_token() -> str: except Exception as e: raise KeycloakError(f"Can't get OIDC token for API access: {type(e).__name__}: {e}") -def kc_get_users() -> List[Dict]: +def kc_get_users() -> List[UserAccount]: """ Get list of users from Keycloak - :return List of dicts, one per user, with keys matching the Keycloak user representation: - https://www.keycloak.org/docs-api/12.0/rest-api/#_userrepresentation + :return List of UserAccount objects :raise KeycloakError """ token = kc_get_token() resp = requests.get(KEYCLOAK_USERS_URL, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: - raise KeycloakError("Can't get list of users: ({resp.status_code}) {resp.text[:200]}") + raise KeycloakError(f"Can't get list of users: ({resp.status_code}) {resp.text[:200]}") try: users = resp.json() assert isinstance(users, list) and all(isinstance(o, dict) for o in users), "" + return [UserAccount.from_keycloak_representation(u) for u in users] + except (ValueError, AssertionError): + raise KeycloakError("Can't get list of users: Unexpected content of response from Keycloak") + +def kc_get_user(userid) -> UserAccount: + """ + Get details of specified user account from Keycloak + + :param userid: Keycloak user ID (not username) + :return UserAccount representation of the user + :raise KeycloakError + """ + assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID" + token = kc_get_token() + url = KEYCLOAK_USERS_URL + "/" + userid + resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) + if not resp.ok: + raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}") + try: + user = resp.json() + assert isinstance(user, dict), "" + return UserAccount.from_keycloak_representation(user) except (ValueError, AssertionError): - raise KeycloakError(f"Can't get list of users: Unexpected content of response from Keycloak") - return users + raise KeycloakError(f"Can't get user info: Unexpected content of response from Keycloak") -def kc_add_user(username: str, firstname: str, lastname: str, cn: str, email: str) -> None: +def kc_add_user(user: UserAccount) -> None: """Add a new user to Keycloak :return None @@ -101,23 +164,30 @@ def kc_add_user(username: str, firstname: str, lastname: str, cn: str, email: st """ token = kc_get_token() - user_data = { - "username": username, - "firstName": firstname, - "lastName": lastname, - "email": email, - "attributes": { - "CN": [cn], - "DN": [f"CN={cn}"] - }, - "enabled": True # user must be explicitly enabled, default is False - } + user_data = user.to_keycloak_representation() + user_data["enabled"] = True # add "enable" key, since a new user must be explicitly enabled (default is False) resp = requests.post(KEYCLOAK_USERS_URL, json=user_data, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") +def kc_update_user(user: UserAccount) -> None: + """Update an existing user in Keycloak + + :param user: UserAccount instance with "kcid" filled + :return None + :raises KeycloakError + """ + token = kc_get_token() + + user_data = user.to_keycloak_representation() + url = KEYCLOAK_USERS_URL + "/" + user.kcid + resp = requests.put(url, json=user_data, + headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) + if not resp.ok: + raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") + def kc_delete_user(userid: str) -> None: """Delete a user from Keycloak @@ -125,6 +195,7 @@ def kc_delete_user(userid: str) -> None: :return None :raise KeycloakError """ + assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID" token = kc_get_token() url = KEYCLOAK_USERS_URL + "/" + userid resp = requests.delete(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) @@ -143,21 +214,8 @@ class AddUserForm(FlaskForm): # DN is constructed automatically from CN -@app.route("/", methods=["GET", "POST"]) +@app.route("/") def main(): - # Add user form - form_add_user = AddUserForm() - if form_add_user.validate_on_submit(): - # TODO check that username doesn't exist, yet (and check validity, i.e. special characters etc.) - # Add user - try: - kc_add_user(form_add_user.username.data, form_add_user.firstname.data, form_add_user.lastname.data, - form_add_user.cn.data, form_add_user.email.data) - flash(f'User "{form_add_user.username.data}" successfully created.', "success") - return redirect("/") # Force new load of the page using GET, so page refresh doesn't trigger new POST. - except KeycloakError as e: - flash(f'Error when creating user: {e}', "error") - # Load existing users from Keycloak try: users = kc_get_users() @@ -169,6 +227,58 @@ def main(): return render_template("main.html", **locals()) +@app.route("/add_user", methods=["GET", "POST"]) +def add_user(): + """Add a new user. On GET show new-user page, on POST create new user account.""" + form_user = AddUserForm() + if form_user.validate_on_submit(): + # Form submitted and valid - create user account + user = UserAccount(username=form_user.username.data, + email=form_user.email.data, + firstname=form_user.firstname.data, + lastname=form_user.lastname.data, + cn=form_user.cn.data, + dn=f"CN={form_user.cn.data}") + try: + kc_add_user(user) + flash(f'User "{form_user.username.data}" successfully created.', "success") + return redirect("/") # Success - go back to main page + except KeycloakError as e: + flash(f'Error when creating user: {e}', "error") + return render_template("add_edit_user.html", form_user=form_user, user=None) + + +@app.route("/edit_user/<userid>", methods=["GET", "POST"]) +def edit_user(userid: str): + """Edit existing user. On GET show user details, on POST update user params with new values.""" + if request.method == "POST": + form_user = AddUserForm() # use data from POST request + if form_user.validate_on_submit(): + # Form submitted and valid - perform account update + user = UserAccount(username=form_user.username.data, + email=form_user.email.data, + firstname=form_user.firstname.data, + lastname=form_user.lastname.data, + cn=form_user.cn.data, + dn=f"CN={form_user.cn.data}", + kcid=userid) + try: + kc_update_user(user) + flash(f'User "{form_user.username.data}" successfully updated.', "success") + return redirect("/") # Success - go back to main page + except KeycloakError as e: + flash(f'Error when updating user: {e}', "error") + return render_template("add_edit_user.html", form_user=form_user, user={"kcid": userid}) + # else - method="GET" + try: + user = kc_get_user(userid) + except KeycloakError as e: + flash(f'ERROR: {e}', "error") + return redirect('/') + form_user = AddUserForm(obj=user) + return render_template("add_edit_user.html", form_user=form_user, user=user) + + @app.route("/delete_user/<userid>") def delete_user(userid: str): """Delete user given by userid and redirect back to main page""" @@ -179,10 +289,10 @@ def delete_user(userid: str): flash(f'Error when deleting user: {e}', "error") return redirect("/") -# TODO edit user? User detail page? # TODO certificates?? +# TODO other services (besides Keycloak) # When the script is run directly, run the application on a local development server. # Optionally pass two parameters, 'host' (IP to listen on) and 'port', diff --git a/requirements.txt b/requirements.txt index 5ce796c..8d67af7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ requests~=2.27.1 jinja2~=3.0.3 PyYAML~=6.0 gunicorn~=20.1.0 +dataclasses~=0.8 diff --git a/static/style.css b/static/style.css index f441703..6ab7fa8 100644 --- a/static/style.css +++ b/static/style.css @@ -9,6 +9,7 @@ table { } td { background: #fff; + padding: 0.2em 0.5em; } @@ -31,3 +32,7 @@ li.flash-error { li.flash-success { color: #090; } + +input[readonly] { + background-color: #ddd; +} \ No newline at end of file diff --git a/templates/add_edit_user.html b/templates/add_edit_user.html new file mode 100644 index 0000000..58adf03 --- /dev/null +++ b/templates/add_edit_user.html @@ -0,0 +1,30 @@ +{% extends "base.html" %} +{% block body %} + +<p><a href="{{ url_for("main") }}">← Back to list of users</a> + +{# if non-empty "user" dict is passed, edit existing user (whose params are in the dict), otherwise add new user #} +{% if user %} +<h2>Edit user</h2> +{% else %} +<h2>Add new user</h2> +{% endif %} + +<form action="{{ url_for("edit_user", userid=user.kcid) if user else url_for("add_user") }}" method="POST"> +{% if form_user.errors %} + <ul class="errors"> + {% for field, errors in form_user.errors.items() %} + <li>{{ form_user[field].label if field else "" }}: {{ ' | '.join(errors) }}</li> + {% endfor %} + </ul> +{% endif %} + {{ form_user.csrf_token }} + {{ form_user.username.label }} {{ form_user.username(size=20, readonly=True if user else False) }}<br> + {{ form_user.firstname.label }} {{ form_user.firstname(size=20) }}<br> + {{ form_user.lastname.label }} {{ form_user.lastname(size=20) }}<br> + {{ form_user.cn.label }} {{ form_user.cn(size=20) }}<br> + {{ form_user.email.label }} {{ form_user.email(size=20) }}<br> + <input type="submit" value="{{"Update user" if user else "Add user"}}"> +</form> + +{% endblock %} diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..6e60ba5 --- /dev/null +++ b/templates/base.html @@ -0,0 +1,25 @@ +<!doctype html> +<html> +<head> + <meta charset="utf-8"> + <title>SOCtools user management</title> + <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='style.css') }}"> +</head> +<body> +{# Flash messages #} +{% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + <div class="flashes"> + {% for category, message in messages %} + <li class="flash-{{category}}">{{ message }}</li> + {% endfor %} + </div> + {% endif %} +{% endwith %} + +<h1>SOCtools - User management</h1> + +{% block body %} +{% endblock %} +</body> +</html> \ No newline at end of file diff --git a/templates/main.html b/templates/main.html index 61e66c6..aa9f51b 100644 --- a/templates/main.html +++ b/templates/main.html @@ -1,60 +1,28 @@ -<!doctype html> -<html> -<head> - <meta charset="utf-8"> - <title>SOCtools user management</title> - <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='style.css') }}"> -</head> -<body> -{# Flash messages #} -{% with messages = get_flashed_messages(with_categories=true) %} - {% if messages %} - <div class="flashes"> - {% for category, message in messages %} - <li class="flash-{{category}}">{{ message }}</li> - {% endfor %} - </div> - {% endif %} -{% endwith %} +{% extends "base.html" %} +{% block body %} -<h1>SOCtools - User management</h1> +<p><a href="{{ url_for("add_user") }}">Add new user ...</a></p> <table> -<tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created</th><th></th> +<tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th></th> {% for user in users %} <tr> <td>{{ user.username }}</td> -<td>{{ user.firstName }}</td> -<td>{{ user.lastName }}</td> +<td>{{ user.firstname }}</td> +<td>{{ user.lastname }}</td> <td>{{ user.email }}</td> -<td>{{ user.attributes.CN[0] }}</td> -<td>{{ user.attributes.DN[0] }}</td> -<td>{{ (user.createdTimestamp/1000)|ts_to_str }}</td> -<td><a href="{{ url_for('delete_user', userid=user.id) }}" title="Delete user" - onclick="return confirm('Are you sure you want to permanently delete user account "{{user.username}}" ({{user.attributes.CN[0]}}, {{user.email}})?')">🗑</a></td> +<td>{{ user.cn }}</td> +<td>{{ user.dn }}</td> +<td>{{ user.ts_created.isoformat() }}</td> +<td> +<a href="{{ url_for('edit_user', userid=user.kcid) }}" title="Edit user">🖉</a> +<a href="{{ url_for('delete_user', userid=user.kcid) }}" title="Delete user" + onclick="return confirm('Are you sure you want to permanently delete user account "{{user.username}}" ({{user.cn}}, {{user.email}})?')">🗑</a> +</td> </tr> {#<tr><td colspan=8>{{ user }}</td></tr>#} {% endfor %} </table> -<h2>Add new user</h2> -<form action="{{ url_for("main") }}" method="POST"> -{% if form_add_user.errors %} - <ul class="errors"> - {% for field, errors in form_add_user.errors.items() %} - <li>{{ form_add_user[field].label if field else "" }}: {{ ' | '.join(errors) }}</li> - {% endfor %} - </ul> -{% endif %} - {{ form_add_user.csrf_token }} - {{ form_add_user.username.label }} {{ form_add_user.username(size=20) }}<br> - {{ form_add_user.firstname.label }} {{ form_add_user.firstname(size=20) }}<br> - {{ form_add_user.lastname.label }} {{ form_add_user.lastname(size=20) }}<br> - {{ form_add_user.cn.label }} {{ form_add_user.cn(size=20) }}<br> - {{ form_add_user.email.label }} {{ form_add_user.email(size=20) }}<br> - <input type="submit" value="Add user"> -</form> - -</body> -</html> \ No newline at end of file +{% endblock %} \ No newline at end of file -- GitLab