Skip to content
Snippets Groups Projects
Commit 864a4636 authored by Václav Bartoš's avatar Václav Bartoš
Browse files

possibility to add and delete users

parent 0376f33b
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@ from datetime import datetime
import os.path
import re
import subprocess
from typing import List,Dict
from flask import Flask, render_template, request, make_response, redirect, flash
from flask_wtf import FlaskForm
......@@ -25,13 +26,14 @@ KEYCLOAK_ADMIN_PASSWORD_FILE = os.path.join(SOCTOOLS_BASE, "secrets/passwords/ke
@app.before_first_request
def load_config():
"""Load various variables, api keys, etc. and set configuration parameters"""
global SOCTOOLSPROXY, KEYCLOAK_BASE_URL, KEYCLOAK_ADMIN_PASSWORD
global SOCTOOLSPROXY, KEYCLOAK_BASE_URL, KEYCLOAK_USERS_URL, KEYCLOAK_ADMIN_PASSWORD
variables = yaml.safe_load(open(VARIABLES_FILE, "r"))
# Get FQDN of the main server
SOCTOOLSPROXY = variables["soctoolsproxy"]
assert re.match('[a-zA-Z0-9.-]+', SOCTOOLSPROXY), f"ERROR: The 'soctoolsproxy' variable loaded from '{VARIABLES_FILE}' is not a valid domain name."
# Set base URL to Keycloak
KEYCLOAK_BASE_URL = f"https://{SOCTOOLSPROXY}:12443"
KEYCLOAK_USERS_URL = KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users"
# Load API key for Keycloak
KEYCLOAK_ADMIN_PASSWORD = open(KEYCLOAK_ADMIN_PASSWORD_FILE, "r").read(100).strip() # read max 100 B, the key should never be so long
print(f"Config loaded:\nSOCTOOLSPROXY={SOCTOOLSPROXY}\nKEYCLOAK_BASE_URL={KEYCLOAK_BASE_URL}\n"
......@@ -39,7 +41,7 @@ def load_config():
# *** Custom Jinja filters ***
def ts_to_str(ts):
def ts_to_str(ts: float) -> str:
return datetime.utcfromtimestamp(int(ts)).isoformat(sep=" ") # TODO Do Keycloak really use UTC timestamps?
app.jinja_env.filters["ts_to_str"] = ts_to_str
......@@ -47,8 +49,15 @@ app.jinja_env.filters["ts_to_str"] = ts_to_str
# *** Functions to call other APIs ***
def get_token():
"""Get admin's OIDC token from Keycloak - needed to perform any administrative API call"""
class KeycloakError(Exception):
pass
def kc_get_token() -> str:
"""
Get admin's OIDC token from Keycloak - needed to perform any administrative API call
Return the token or raise KeycloakError
"""
url = KEYCLOAK_BASE_URL + "/auth/realms/master/protocol/openid-connect/token"
data = {
"client_id": "admin-cli",
......@@ -59,32 +68,70 @@ def get_token():
try:
resp = requests.post(url, data, verify=CA_CERT_FILE)
if resp.status_code != 200:
flash(f"ERROR: Can't get token for API access: ({resp.status_code}) {resp.text[:200]}", "error")
return None
raise KeycloakError(f"Can't get OIDC token for API access: ({resp.status_code}) {resp.text[:200]}")
return str(resp.json()['access_token'])
except Exception as e:
flash(f"ERROR: Can't get token for API access: {type(e).__name__}: {e}", "error")
return None
def get_users():
# Get list of users from Keycloak
url = KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users"
token = get_token()
if token is None:
return [] # can't get token, error message is already flashed by get_token function
resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
raise KeycloakError(f"Can't get OIDC token for API access: {type(e).__name__}: {e}")
def kc_get_users() -> List[Dict]:
"""
Get list of users from Keycloak
:return List of dicts, one per user, with keys matching the Keycloak user representation:
https://www.keycloak.org/docs-api/12.0/rest-api/#_userrepresentation
:raise KeycloakError
"""
token = kc_get_token()
resp = requests.get(KEYCLOAK_USERS_URL, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
if not resp.ok:
flash(f"ERROR: Can't get list of users: ({resp.status_code}) {resp.text[:200]}", "error")
return []
raise KeycloakError("Can't get list of users: ({resp.status_code}) {resp.text[:200]}")
try:
users = resp.json()
assert isinstance(users, list) and all(isinstance(o, dict) for o in users), ""
except (ValueError, AssertionError):
flash(f"ERROR: Can't get list of users: Unexpected content of response from Keycloak", "error")
return []
raise KeycloakError(f"Can't get list of users: Unexpected content of response from Keycloak")
return users
def kc_add_user(username: str, firstname: str, lastname: str, cn: str, email: str) -> None:
"""Add a new user to Keycloak
:return None
:raises KeycloakError
"""
token = kc_get_token()
user_data = {
"username": username,
"firstName": firstname,
"lastName": lastname,
"email": email,
"attributes": {
"CN": [cn],
"DN": [f"CN={cn}"]
},
"enabled": True # user must be explicitly enabled, default is False
}
resp = requests.post(KEYCLOAK_USERS_URL, json=user_data,
headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
if not resp.ok:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
def kc_delete_user(userid: str) -> None:
"""Delete a user from Keycloak
:param userid: Keycloak user ID (not username)
:return None
:raise KeycloakError
"""
token = kc_get_token()
url = KEYCLOAK_USERS_URL + "/" + userid
resp = requests.delete(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE)
if not resp.ok:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
# *** Flask endpoints and forms ***
class AddUserForm(FlaskForm):
......@@ -98,25 +145,39 @@ class AddUserForm(FlaskForm):
@app.route("/", methods=["GET", "POST"])
def main():
# Load existing users from Keycloak
users = get_users()
#print(users)
# Add user form
form_add_user = AddUserForm()
if form_add_user.validate_on_submit():
# TODO check that username doesn't exist, yet (and check validity, i.e. special characters etc.)
# TODO add user
result = subprocess.run(["echo", "test"])
if result.returncode == 0:
# Add user
try:
kc_add_user(form_add_user.username.data, form_add_user.firstname.data, form_add_user.lastname.data,
form_add_user.cn.data, form_add_user.email.data)
flash(f'User "{form_add_user.username.data}" successfully created.', "success")
else:
flash(f'Error when creating user: {result.stderr}', "error")
return redirect("/") # Force new load of the page using GET, so page refresh doesn't trigger new POST.
except KeycloakError as e:
flash(f'Error when creating user: {e}', "error")
# Load existing users from Keycloak
try:
users = kc_get_users()
except KeycloakError as e:
flash(f"ERROR: {e}", "error")
users = []
#print(users)
return render_template("main.html", **locals())
# TODO AJAX endpoint to delete user
@app.route("/delete_user/<userid>")
def delete_user(userid: str):
"""Delete user given by userid and redirect back to main page"""
try:
kc_delete_user(userid)
flash(f'User successfully deleted.', "success")
except KeycloakError as e:
flash(f'Error when deleting user: {e}', "error")
return redirect("/")
# TODO edit user? User detail page?
......
......@@ -31,8 +31,10 @@
<td>{{ user.attributes.CN[0] }}</td>
<td>{{ user.attributes.DN[0] }}</td>
<td>{{ (user.createdTimestamp/1000)|ts_to_str }}</td>
<td>...</td>
<td><a href="{{ url_for('delete_user', userid=user.id) }}" title="Delete user"
onclick="return confirm('Are you sure you want to permanently delete user account &quot;{{user.username}}&quot; ({{user.attributes.CN[0]}}, {{user.email}})?')">&#128465;</a></td>
</tr>
{#<tr><td colspan=8>{{ user }}</td></tr>#}
{% endfor %}
</table>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment