Skip to content
Snippets Groups Projects
Commit a95e497f authored by Václav Bartoš's avatar Václav Bartoš
Browse files

Allow to edit user (plus some more changes)

- "Add user" moved to separate page
- Added a UserAccount class as our own representation of user account information
parent 864a4636
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import sys
from datetime import datetime
from datetime import datetime, timezone
import os.path
import re
import subprocess
from typing import List,Dict
from typing import List, Dict, Optional
from flask import Flask, render_template, request, make_response, redirect, flash
from flask_wtf import FlaskForm
......@@ -13,6 +13,7 @@ from wtforms.validators import DataRequired, Email
import requests
import yaml
from dataclasses import dataclass, field # external package - backport of Py3.7 dataclasses to Py3.6
app = Flask(__name__)
app.secret_key = "ASDF1234 - CHANGE ME!"
......@@ -47,6 +48,48 @@ def ts_to_str(ts: float) -> str:
app.jinja_env.filters["ts_to_str"] = ts_to_str
# *** Our user representation ***
@dataclass
class UserAccount:
username: str
email: str
firstname: str
lastname: str
cn: str
dn: str
kcid: Optional[str] = field(default=None) # keycloak ID
ts_created: Optional[datetime] = field(default=None) # timezone-aware datetime in UTC
def to_keycloak_representation(self) -> Dict:
"""
Create Keycloak representation of user account information
Ref: https://www.keycloak.org/docs-api/12.0/rest-api/#_userrepresentation
"""
return {
"id": self.kcid,
"username": self.username,
"firstName": self.firstname,
"lastName": self.lastname,
"email": self.email,
"attributes": {
"CN": [self.cn],
"DN": [f"CN={self.cn}"]
},
}
@classmethod
def from_keycloak_representation(cls, kc_user: dict) -> "UserAccount":
try:
return cls(kc_user['username'], kc_user['email'], kc_user['firstName'], kc_user['lastName'],
kc_user['attributes'].get('CN',[''])[0], kc_user['attributes'].get('DN',[''])[0],
kc_user['id'],
datetime.utcfromtimestamp(int(kc_user['createdTimestamp']/1000)).replace(tzinfo=timezone.utc))
except KeyError as e:
raise KeycloakError(f"User representation received from Keycloak is missing attribute '{e}'")
# *** Functions to call other APIs ***
class KeycloakError(Exception):
......@@ -73,27 +116,47 @@ def kc_get_token() -> str:
except Exception as e:
raise KeycloakError(f"Can't get OIDC token for API access: {type(e).__name__}: {e}")
def kc_get_users() -> List[Dict]:
def kc_get_users() -> List[UserAccount]:
"""
Get list of users from Keycloak
:return List of dicts, one per user, with keys matching the Keycloak user representation:
https://www.keycloak.org/docs-api/12.0/rest-api/#_userrepresentation
:return List of UserAccount objects
:raise KeycloakError
"""
token = kc_get_token()
resp = requests.get(KEYCLOAK_USERS_URL, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
if not resp.ok:
raise KeycloakError("Can't get list of users: ({resp.status_code}) {resp.text[:200]}")
raise KeycloakError(f"Can't get list of users: ({resp.status_code}) {resp.text[:200]}")
try:
users = resp.json()
assert isinstance(users, list) and all(isinstance(o, dict) for o in users), ""
return [UserAccount.from_keycloak_representation(u) for u in users]
except (ValueError, AssertionError):
raise KeycloakError("Can't get list of users: Unexpected content of response from Keycloak")
def kc_get_user(userid) -> UserAccount:
"""
Get details of specified user account from Keycloak
:param userid: Keycloak user ID (not username)
:return UserAccount representation of the user
:raise KeycloakError
"""
assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID"
token = kc_get_token()
url = KEYCLOAK_USERS_URL + "/" + userid
resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
if not resp.ok:
raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}")
try:
user = resp.json()
assert isinstance(user, dict), ""
return UserAccount.from_keycloak_representation(user)
except (ValueError, AssertionError):
raise KeycloakError(f"Can't get list of users: Unexpected content of response from Keycloak")
return users
raise KeycloakError(f"Can't get user info: Unexpected content of response from Keycloak")
def kc_add_user(username: str, firstname: str, lastname: str, cn: str, email: str) -> None:
def kc_add_user(user: UserAccount) -> None:
"""Add a new user to Keycloak
:return None
......@@ -101,23 +164,30 @@ def kc_add_user(username: str, firstname: str, lastname: str, cn: str, email: st
"""
token = kc_get_token()
user_data = {
"username": username,
"firstName": firstname,
"lastName": lastname,
"email": email,
"attributes": {
"CN": [cn],
"DN": [f"CN={cn}"]
},
"enabled": True # user must be explicitly enabled, default is False
}
user_data = user.to_keycloak_representation()
user_data["enabled"] = True # add "enable" key, since a new user must be explicitly enabled (default is False)
resp = requests.post(KEYCLOAK_USERS_URL, json=user_data,
headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
if not resp.ok:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
def kc_update_user(user: UserAccount) -> None:
"""Update an existing user in Keycloak
:param user: UserAccount instance with "kcid" filled
:return None
:raises KeycloakError
"""
token = kc_get_token()
user_data = user.to_keycloak_representation()
url = KEYCLOAK_USERS_URL + "/" + user.kcid
resp = requests.put(url, json=user_data,
headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
if not resp.ok:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
def kc_delete_user(userid: str) -> None:
"""Delete a user from Keycloak
......@@ -125,6 +195,7 @@ def kc_delete_user(userid: str) -> None:
:return None
:raise KeycloakError
"""
assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID"
token = kc_get_token()
url = KEYCLOAK_USERS_URL + "/" + userid
resp = requests.delete(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
......@@ -143,21 +214,8 @@ class AddUserForm(FlaskForm):
# DN is constructed automatically from CN
@app.route("/", methods=["GET", "POST"])
@app.route("/")
def main():
# Add user form
form_add_user = AddUserForm()
if form_add_user.validate_on_submit():
# TODO check that username doesn't exist, yet (and check validity, i.e. special characters etc.)
# Add user
try:
kc_add_user(form_add_user.username.data, form_add_user.firstname.data, form_add_user.lastname.data,
form_add_user.cn.data, form_add_user.email.data)
flash(f'User "{form_add_user.username.data}" successfully created.', "success")
return redirect("/") # Force new load of the page using GET, so page refresh doesn't trigger new POST.
except KeycloakError as e:
flash(f'Error when creating user: {e}', "error")
# Load existing users from Keycloak
try:
users = kc_get_users()
......@@ -169,6 +227,58 @@ def main():
return render_template("main.html", **locals())
@app.route("/add_user", methods=["GET", "POST"])
def add_user():
"""Add a new user. On GET show new-user page, on POST create new user account."""
form_user = AddUserForm()
if form_user.validate_on_submit():
# Form submitted and valid - create user account
user = UserAccount(username=form_user.username.data,
email=form_user.email.data,
firstname=form_user.firstname.data,
lastname=form_user.lastname.data,
cn=form_user.cn.data,
dn=f"CN={form_user.cn.data}")
try:
kc_add_user(user)
flash(f'User "{form_user.username.data}" successfully created.', "success")
return redirect("/") # Success - go back to main page
except KeycloakError as e:
flash(f'Error when creating user: {e}', "error")
return render_template("add_edit_user.html", form_user=form_user, user=None)
@app.route("/edit_user/<userid>", methods=["GET", "POST"])
def edit_user(userid: str):
"""Edit existing user. On GET show user details, on POST update user params with new values."""
if request.method == "POST":
form_user = AddUserForm() # use data from POST request
if form_user.validate_on_submit():
# Form submitted and valid - perform account update
user = UserAccount(username=form_user.username.data,
email=form_user.email.data,
firstname=form_user.firstname.data,
lastname=form_user.lastname.data,
cn=form_user.cn.data,
dn=f"CN={form_user.cn.data}",
kcid=userid)
try:
kc_update_user(user)
flash(f'User "{form_user.username.data}" successfully updated.', "success")
return redirect("/") # Success - go back to main page
except KeycloakError as e:
flash(f'Error when updating user: {e}', "error")
return render_template("add_edit_user.html", form_user=form_user, user={"kcid": userid})
# else - method="GET"
try:
user = kc_get_user(userid)
except KeycloakError as e:
flash(f'ERROR: {e}', "error")
return redirect('/')
form_user = AddUserForm(obj=user)
return render_template("add_edit_user.html", form_user=form_user, user=user)
@app.route("/delete_user/<userid>")
def delete_user(userid: str):
"""Delete user given by userid and redirect back to main page"""
......@@ -179,10 +289,10 @@ def delete_user(userid: str):
flash(f'Error when deleting user: {e}', "error")
return redirect("/")
# TODO edit user? User detail page?
# TODO certificates??
# TODO other services (besides Keycloak)
# When the script is run directly, run the application on a local development server.
# Optionally pass two parameters, 'host' (IP to listen on) and 'port',
......
......@@ -6,3 +6,4 @@ requests~=2.27.1
jinja2~=3.0.3
PyYAML~=6.0
gunicorn~=20.1.0
dataclasses~=0.8
......@@ -9,6 +9,7 @@ table {
}
td {
background: #fff;
padding: 0.2em 0.5em;
}
......@@ -31,3 +32,7 @@ li.flash-error {
li.flash-success {
color: #090;
}
input[readonly] {
background-color: #ddd;
}
\ No newline at end of file
{% extends "base.html" %}
{% block body %}
<p><a href="{{ url_for("main") }}">&larr; Back to list of users</a>
{# if non-empty "user" dict is passed, edit existing user (whose params are in the dict), otherwise add new user #}
{% if user %}
<h2>Edit user</h2>
{% else %}
<h2>Add new user</h2>
{% endif %}
<form action="{{ url_for("edit_user", userid=user.kcid) if user else url_for("add_user") }}" method="POST">
{% if form_user.errors %}
<ul class="errors">
{% for field, errors in form_user.errors.items() %}
<li>{{ form_user[field].label if field else "" }}: {{ ' | '.join(errors) }}</li>
{% endfor %}
</ul>
{% endif %}
{{ form_user.csrf_token }}
{{ form_user.username.label }} {{ form_user.username(size=20, readonly=True if user else False) }}<br>
{{ form_user.firstname.label }} {{ form_user.firstname(size=20) }}<br>
{{ form_user.lastname.label }} {{ form_user.lastname(size=20) }}<br>
{{ form_user.cn.label }} {{ form_user.cn(size=20) }}<br>
{{ form_user.email.label }} {{ form_user.email(size=20) }}<br>
<input type="submit" value="{{"Update user" if user else "Add user"}}">
</form>
{% endblock %}
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<title>SOCtools user management</title>
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='style.css') }}">
</head>
<body>
{# Flash messages #}
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
<div class="flashes">
{% for category, message in messages %}
<li class="flash-{{category}}">{{ message }}</li>
{% endfor %}
</div>
{% endif %}
{% endwith %}
<h1>SOCtools - User management</h1>
{% block body %}
{% endblock %}
</body>
</html>
\ No newline at end of file
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<title>SOCtools user management</title>
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='style.css') }}">
</head>
<body>
{# Flash messages #}
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
<div class="flashes">
{% for category, message in messages %}
<li class="flash-{{category}}">{{ message }}</li>
{% endfor %}
</div>
{% endif %}
{% endwith %}
{% extends "base.html" %}
{% block body %}
<h1>SOCtools - User management</h1>
<p><a href="{{ url_for("add_user") }}">Add new user ...</a></p>
<table>
<tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created</th><th></th>
<tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th></th>
{% for user in users %}
<tr>
<td>{{ user.username }}</td>
<td>{{ user.firstName }}</td>
<td>{{ user.lastName }}</td>
<td>{{ user.firstname }}</td>
<td>{{ user.lastname }}</td>
<td>{{ user.email }}</td>
<td>{{ user.attributes.CN[0] }}</td>
<td>{{ user.attributes.DN[0] }}</td>
<td>{{ (user.createdTimestamp/1000)|ts_to_str }}</td>
<td><a href="{{ url_for('delete_user', userid=user.id) }}" title="Delete user"
onclick="return confirm('Are you sure you want to permanently delete user account &quot;{{user.username}}&quot; ({{user.attributes.CN[0]}}, {{user.email}})?')">&#128465;</a></td>
<td>{{ user.cn }}</td>
<td>{{ user.dn }}</td>
<td>{{ user.ts_created.isoformat() }}</td>
<td>
<a href="{{ url_for('edit_user', userid=user.kcid) }}" title="Edit user">&#128393;</a>
<a href="{{ url_for('delete_user', userid=user.kcid) }}" title="Delete user"
onclick="return confirm('Are you sure you want to permanently delete user account &quot;{{user.username}}&quot; ({{user.cn}}, {{user.email}})?')">&#128465;</a>
</td>
</tr>
{#<tr><td colspan=8>{{ user }}</td></tr>#}
{% endfor %}
</table>
<h2>Add new user</h2>
<form action="{{ url_for("main") }}" method="POST">
{% if form_add_user.errors %}
<ul class="errors">
{% for field, errors in form_add_user.errors.items() %}
<li>{{ form_add_user[field].label if field else "" }}: {{ ' | '.join(errors) }}</li>
{% endfor %}
</ul>
{% endif %}
{{ form_add_user.csrf_token }}
{{ form_add_user.username.label }} {{ form_add_user.username(size=20) }}<br>
{{ form_add_user.firstname.label }} {{ form_add_user.firstname(size=20) }}<br>
{{ form_add_user.lastname.label }} {{ form_add_user.lastname(size=20) }}<br>
{{ form_add_user.cn.label }} {{ form_add_user.cn(size=20) }}<br>
{{ form_add_user.email.label }} {{ form_add_user.email(size=20) }}<br>
<input type="submit" value="Add user">
</form>
</body>
</html>
\ No newline at end of file
{% endblock %}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment