#!/usr/bin/env python3 import sys from datetime import datetime, timezone import os.path import re from typing import List, Dict, Optional from flask import Flask, render_template, request, make_response, redirect, flash from flask_wtf import FlaskForm from wtforms import StringField from wtforms.validators import DataRequired, Email import requests import yaml from dataclasses import dataclass, field # external package - backport of Py3.7 dataclasses to Py3.6 from nifi import * app = Flask(__name__) app.secret_key = "ASDF1234 - CHANGE ME!" # *** Configuration of file paths *** SOCTOOLS_BASE = ".." # path to the root of soctools files VARIABLES_FILE = os.path.join(SOCTOOLS_BASE, "group_vars/all/variables.yml") CA_CERT_FILE = os.path.join(SOCTOOLS_BASE, "secrets/CA/ca.crt") KEYCLOAK_ADMIN_PASSWORD_FILE = os.path.join(SOCTOOLS_BASE, "secrets/passwords/keykloak_admin") # Note: should be keycloak, not keykloak @app.before_first_request def load_config(): """Load various variables, api keys, etc. and set configuration parameters""" global SOCTOOLSPROXY, KEYCLOAK_BASE_URL, KEYCLOAK_USERS_URL, KEYCLOAK_ADMIN_PASSWORD variables = yaml.safe_load(open(VARIABLES_FILE, "r")) # Get FQDN of the main server SOCTOOLSPROXY = variables["soctoolsproxy"] assert re.match('[a-zA-Z0-9.-]+', SOCTOOLSPROXY), f"ERROR: The 'soctoolsproxy' variable loaded from '{VARIABLES_FILE}' is not a valid domain name." # Set base URL to Keycloak KEYCLOAK_BASE_URL = f"https://{SOCTOOLSPROXY}:12443" KEYCLOAK_USERS_URL = KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users" # Load API key for Keycloak KEYCLOAK_ADMIN_PASSWORD = open(KEYCLOAK_ADMIN_PASSWORD_FILE, "r").read(100).strip() # read max 100 B, the key should never be so long print(f"Config loaded:\nSOCTOOLSPROXY={SOCTOOLSPROXY}\nKEYCLOAK_BASE_URL={KEYCLOAK_BASE_URL}\n" f"KEYCLOAK_ADMIN_PASSWORD={KEYCLOAK_ADMIN_PASSWORD[:3]}...{KEYCLOAK_ADMIN_PASSWORD[-4:]}") # *** Custom Jinja filters *** def ts_to_str(ts: float) -> str: return datetime.utcfromtimestamp(int(ts)).isoformat(sep=" ") # TODO Do Keycloak really use UTC timestamps? app.jinja_env.filters["ts_to_str"] = ts_to_str # *** Our user representation *** @dataclass class UserAccount: username: str email: str firstname: str lastname: str cn: str dn: str kcid: Optional[str] = field(default=None) # keycloak ID ts_created: Optional[datetime] = field(default=None) # timezone-aware datetime in UTC components: Optional[Dict[str, bool]] = field(default_factory=dict) # Presence of the account in SOCtools components that don't use Keycloak directly (NiFi, MISP, TheHive, ...) def to_keycloak_representation(self) -> Dict: """ Create Keycloak representation of user account information Ref: https://www.keycloak.org/docs-api/12.0/rest-api/#_userrepresentation """ return { "id": self.kcid, "username": self.username, "firstName": self.firstname, "lastName": self.lastname, "email": self.email, "attributes": { "CN": [self.cn], "DN": [f"CN={self.cn}"] }, } @classmethod def from_keycloak_representation(cls, kc_user: dict) -> "UserAccount": try: return cls(kc_user['username'], kc_user['email'], kc_user['firstName'], kc_user['lastName'], kc_user['attributes'].get('CN',[''])[0], kc_user['attributes'].get('DN',[''])[0], kc_user['id'], datetime.utcfromtimestamp(int(kc_user['createdTimestamp']/1000)).replace(tzinfo=timezone.utc)) except KeyError as e: raise KeycloakError(f"User representation received from Keycloak is missing attribute '{e}'") # *** Functions to call other APIs *** class KeycloakError(Exception): pass def kc_get_token() -> str: """ Get admin's OIDC token from Keycloak - needed to perform any administrative API call Return the token or raise KeycloakError """ url = KEYCLOAK_BASE_URL + "/auth/realms/master/protocol/openid-connect/token" data = { "client_id": "admin-cli", "username": "admin", "password": KEYCLOAK_ADMIN_PASSWORD, "grant_type": "password" } try: resp = requests.post(url, data, verify=CA_CERT_FILE) if resp.status_code != 200: raise KeycloakError(f"Can't get OIDC token for API access: ({resp.status_code}) {resp.text[:200]}") return str(resp.json()['access_token']) except Exception as e: raise KeycloakError(f"Can't get OIDC token for API access: {type(e).__name__}: {e}") def kc_get_users() -> List[UserAccount]: """ Get list of users from Keycloak :return List of UserAccount objects :raise KeycloakError """ token = kc_get_token() resp = requests.get(KEYCLOAK_USERS_URL, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: raise KeycloakError(f"Can't get list of users: ({resp.status_code}) {resp.text[:200]}") try: users = resp.json() assert isinstance(users, list) and all(isinstance(o, dict) for o in users), "" return [UserAccount.from_keycloak_representation(u) for u in users] except (ValueError, AssertionError): raise KeycloakError("Can't get list of users: Unexpected content of response from Keycloak") def kc_get_user_by_id(userid: str) -> UserAccount: """ Get details of specified user account from Keycloak :param userid: Keycloak user ID (not username) :return UserAccount representation of the user :raise KeycloakError """ assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID" token = kc_get_token() url = KEYCLOAK_USERS_URL + "/" + userid resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}") try: user = resp.json() assert isinstance(user, dict), "" return UserAccount.from_keycloak_representation(user) except (ValueError, AssertionError): raise KeycloakError(f"Can't get user info: Unexpected content of response from Keycloak") def kc_get_user_by_name(username: str) -> Optional[UserAccount]: """ Get details of specified user account from Keycloak :param username: Keycloak username (not ID) :return UserAccount representation of the user or None is user not found :raise KeycloakError """ token = kc_get_token() url = KEYCLOAK_USERS_URL resp = requests.get(url, params={'username': username, 'exact': 'true'}, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}") print(resp.text) try: users = resp.json() assert isinstance(users, list), "" except (ValueError, AssertionError): raise KeycloakError(f"Can't get user info: Unexpected content of response from Keycloak") if len(users) == 0: raise KeycloakError(f"No user with username '{username}'") return UserAccount.from_keycloak_representation(users[0]) def kc_add_user(user: UserAccount) -> None: """Add a new user to Keycloak :return None :raises KeycloakError """ token = kc_get_token() user_data = user.to_keycloak_representation() user_data["enabled"] = True # add "enable" key, since a new user must be explicitly enabled (default is False) resp = requests.post(KEYCLOAK_USERS_URL, json=user_data, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") def kc_update_user(user: UserAccount) -> None: """Update an existing user in Keycloak :param user: UserAccount instance with "kcid" filled :return None :raises KeycloakError """ token = kc_get_token() user_data = user.to_keycloak_representation() url = KEYCLOAK_USERS_URL + "/" + user.kcid resp = requests.put(url, json=user_data, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") def kc_delete_user(userid: str) -> None: """Delete a user from Keycloak :param userid: Keycloak user ID (not username) :return None :raise KeycloakError """ assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID" token = kc_get_token() url = KEYCLOAK_USERS_URL + "/" + userid resp = requests.delete(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) if not resp.ok: raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") # *** NiFi *** # *** Flask endpoints and forms *** class AddUserForm(FlaskForm): username = StringField("Username", validators=[DataRequired()]) cn = StringField("Common name (CN)", validators=[DataRequired()]) firstname = StringField("First name", validators=[]) lastname = StringField("Last name", validators=[]) email = StringField("Email", validators=[DataRequired(), Email()]) # DN is constructed automatically from CN @app.route("/") def main(): # Load existing users from Keycloak try: users = kc_get_users() except KeycloakError as e: flash(f"ERROR: {e}", "error") users = [] #print(users) # Load NiFi users try: nifi_users = nifi_get_users() except NifiError as e: flash(f"ERROR: {e}", "error") nifi_users = [] nifi_usernames = set(nu["name"] for nu in nifi_users) return render_template("main.html", **locals()) @app.route("/add_user", methods=["GET", "POST"]) def add_user(): """Add a new user. On GET show the new-user page, on POST create new user account.""" form_user = AddUserForm() if form_user.validate_on_submit(): # Form submitted and valid - create user account user = UserAccount(username=form_user.username.data, email=form_user.email.data, firstname=form_user.firstname.data, lastname=form_user.lastname.data, cn=form_user.cn.data, dn=f"CN={form_user.cn.data}") # Keycloak try: kc_add_user(user) flash(f'User "{form_user.username.data}" successfully created in Keycloak.', "success") except Exception as e: flash(f'Error when creating user in Keycloak: {e}', "error") # NiFi try: nifi_add_user(user) flash(f'User "{form_user.username.data}" successfully created in NiFi.', "success") except NifiUserExistsError: flash(f'User "{user.username}" already exists in NiFi, nothing has changed.', "warning") except Exception as e: flash(f'Error when creating user in NiFi: {e}', "error") return redirect("/") # Success - go back to main page return render_template("add_edit_user.html", form_user=form_user, user=None) @app.route("/edit_user/<username>", methods=["GET", "POST"]) def edit_user(username: str): """Edit existing user. On GET show user details, on POST update user params with new values.""" keycloak_id = kc_get_user_by_name(username).kcid # TODO catch exception if request.method == "POST": form_user = AddUserForm() # use data from POST request if form_user.validate_on_submit(): # Form submitted and valid - perform account update user = UserAccount(username=form_user.username.data, email=form_user.email.data, firstname=form_user.firstname.data, lastname=form_user.lastname.data, cn=form_user.cn.data, dn=f"CN={form_user.cn.data}", kcid=keycloak_id) try: kc_update_user(user) flash(f'User "{form_user.username.data}" successfully updated.', "success") return redirect("/") # Success - go back to main page except KeycloakError as e: flash(f'Error when updating user: {e}', "error") return render_template("add_edit_user.html", form_user=form_user, user={"kcid": keycloak_id}) # else - method="GET" try: user = kc_get_user_by_id(keycloak_id) except KeycloakError as e: flash(f'ERROR: {e}', "error") return redirect('/') form_user = AddUserForm(obj=user) return render_template("add_edit_user.html", form_user=form_user, user=user) @app.route("/delete_user/<username>") def delete_user(username: str): """Delete user given by username and redirect back to main page""" try: keycloak_id = kc_get_user_by_name(username).kcid kc_delete_user(keycloak_id) flash('User successfully deleted from KeyCloak.', "success") except KeycloakError as e: flash(f'Error when deleting user from KeyCloak: {e}', "error") try: nifi_delete_user(username) flash(f'User "{username}" successfully deleted from NiFi.', "success") except NifiUserNotFoundError: flash(f'User "{username}" was not found in NiFi, nothing has changed.', "warning") except NifiError as e: flash(f'Error when deleting user from NiFi: {e}', "error") return redirect("/") # TODO certificates?? # TODO other services (besides Keycloak) # TODO authentication/authorization to this GUI # When the script is run directly, run the application on a local development server. # Optionally pass two parameters, 'host' (IP to listen on) and 'port', # e.g.: ./main.py 0.0.0.0 8080 if __name__ == '__main__': host, port = '127.0.0.1', 5000 # defaults if len(sys.argv) > 2: host = sys.argv[1] port = int(sys.argv[2]) app.config['ENV'] = 'development' app.run(host=host, port=port, debug=True)