diff --git a/certificates.py b/certificates.py new file mode 100644 index 0000000000000000000000000000000000000000..818fad4490806fcf5fb78d9a7318013c33d6a2a3 --- /dev/null +++ b/certificates.py @@ -0,0 +1,214 @@ +""" +Functions to manage user certificates + +User authentication within SOCtools is mandated by x509 certificates issued by an internal SOCtools CA. +The CA is based on the Easy-RSA tool which is used to manage everything around the certificates +(all its config and data is in "<SOCTOOLS_BASE>/secrets/CA/"). + +When a new user is created, a private key and a certificate with his/her username as CN are generated by Easy-RSA +in the background (stored into ".../CA/issued/<username>.crt"). # TODO check +The user is emailed with an URL containing a unique token allowing him/her to export and download the certificate and +private key in the .p12 format. # TODO other formats? + +The linkage of these tokens to usernames is stored in a local file at: "<SOCTOOLS_BASE>/secrets/cert_access_tokens" +Format: "token,username,expiration-time" per line +""" +import io +from typing import List, Dict, Tuple, Optional +from typing.io import BinaryIO +import sys +import os.path +import subprocess +import random +import string +from datetime import datetime, timedelta + + +import config + +# Path to 'easyrsa' executable +#TODO: don't use the one from repository, should be installed somewhere +EASYRSA = os.path.join(config.SOCTOOLS_BASE, "roles/ca/files/easyrsa/easyrsa") + +# Environment variables to pass to easyrsa +EASYRSA_ENV = { + "EASYRSA_BATCH": "1", + "EASYRSA_PKI": config.CA_DIR, +} + +TOKEN_FILE = os.path.join(config.SOCTOOLS_BASE, "secrets/cert_access_tokens") +TOKEN_FILE_HEADER = """ +# Unique tokens allowing users to download the certificates generated for them via the user-mgmt-ui. +# !! AUTOMATICALLY GENERATED, DON'T EDIT !! (unless you know what you are doing) +""".lstrip() +TOKEN_EXPIRATION_HOURS = 24 + +EMAIL_SUBJECT = "[SOCtools] New account for {name}" +EMAIL_TEMPLATE = """ +Dear {name}, + +a user account has been created for you in the SOCtools system running at {base_url}. + +Your username: {username} + +You can authenticate to the system using a personal certificate which has been created for you. +Please, download the certificate from the following link and install it into your browser: + +{access_url} + +Don't share the link with anyone! It allows to download your certificate, including the private key. + +The link will expire in {} hours. +""" + + +class CertError(Exception): + pass + + +# ========================= +# Public interface + +def generate_certificate(cn: str): + """Generate a new x509 certificate for given user. + + The certificate and associated private key are stored as file in the Easy-RSA directory: + - cert (pem): <CA_DIR>/issued/<cn>.crt + - key (pem): <CA_DIR>/private/<cn>.key + - both (p12): <CA_DIR>/private/<cn>.p12 + + :param cn: CN (common name) to fill in the certificate + """ + _check_cn(cn) + # Create new key+cert (stored in PEM format) + cmd = [EASYRSA, "build-client-full", cn, "nopass"] + print(f"Running command: {' '.join(cmd)}") + result = subprocess.run(cmd, env=EASYRSA_ENV, stderr=subprocess.PIPE, encoding="ascii", errors="backslashreplace") + if result.returncode != 0: + raise CertError(f"Can't create a certificate for '{cn}': {result.stderr[:500]}") + + +# TODO: do the same for PEM? It would allow to download encrypted PEM-formatted key, but otherwise it's not needed +def export_p12_certificate(cn: str, password: str="") -> BinaryIO: + """ + Export user's certificate+key into PKCS12 (.p12) file encrypted by given password + + :param cn: CN (common name) identifying the certificate + :param password: Password to encrypt the file + :return: Path to temporary .p12 file, should be removed by caller after use + """ + _check_cn(cn) + # Export cert+key to a temporary file (using openssl, since easy-rsa doesn't allow to pass password on command line) + out_file = os.path.join(config.CA_DIR, f"~tmp.{cn}.p12") + crt_file, key_file = get_pem_files(cn) + cmd = ["openssl", "pkcs12", "-export", "-out", out_file, "-in", crt_file, "-inkey", key_file, "-passout", "pass:"+password] + print(f"Running command: {' '.join(cmd)}") + result = subprocess.run(cmd, stderr=subprocess.PIPE, encoding="ascii", errors="backslashreplace") + if result.returncode != 0: + raise CertError(f"Can't export p12 certificate for '{cn}': {result.stderr[:500]}") + # Open the file, delete it (i.e. remove filesystem entry) and return the opened file-like object (the file is still + # available until closed; it should be closed automatically when no variable points to it) + out_file_opened = open(out_file, "rb") + os.remove(out_file) + return out_file_opened + + +def revoke_certificate(cn: str): + """Revoke a previously issued x509 certificate. + + :param cn: CN (common name) identifying the certificate + """ + _check_cn(cn) + raise NotImplementedError + # cmd = [{EASYRSA}, "TODO", cn] + # result = subprocess.run(cmd, env=EASYRSA_ENV, stderr=subprocess.PIPE) + # if result.returncode != 0: + # raise CertError(f"Can't revoke the certificate for '{cn}': {result.stderr[:500]}") + + +def get_pem_files(cn: str): + """Return path to the certificate (.crt) and key (.key) files of given user + + :param cn: CN (common name) identifying the certificate + :return: Tuple with *.cert and *.key files in PEM format + """ + _check_cn(cn) + return (os.path.join(config.CA_DIR, "issued", cn+".crt"), + os.path.join(config.CA_DIR, "private", cn+".key")) + + +def generate_access_token(cn: str) -> str: + """ + Generate and store a new access token (16 random chars) allowing the user to download his/her certificate. + + :param cn: username (CN from certificate) + :return the generated token + """ + token_mapping = _read_token_file() + # Generate 16 random characters (letters and numbers) + new_token = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16)) + # Check that the token doesn't exit, yet (just from principle, the chance is extremely small) + if new_token in token_mapping: + raise RuntimeError("Generated a random token that already exists! This should only happen once in 4,7e28" + "trials on average - either you're impossibly (un)lucky or something's wrong.") + # Set expiration time + exp_time = datetime.utcnow() + timedelta(hours=TOKEN_EXPIRATION_HOURS) + # Add the new token, username and exp. time to the list and write it to the file + token_mapping[new_token] = (cn, exp_time) + _write_token_file(token_mapping) + # Return the token + return new_token + + +def get_username_by_token(token: str) -> Optional[str]: + """Return username for which given token was generated (or None if unknown token)""" + token_mapping = _read_token_file() + if token not in token_mapping: + return None # non-existent token + username, exp = token_mapping.get(token) + if datetime.utcnow() > exp: + return None # expired token + return username + + +# ========================= +# Auxiliary functions + +def _check_cn(cn): + """ + Check CN validity - it must be possible to store it as a filename, without changing directory + + :raise ValueError + """ + if (".." in cn or "/" in cn or # this would allow to access files outside the CA directory + cn == "" or len(cn) > 64): # don't allow empty or too long usernames + raise ValueError("Invalid username/common_name") + +def _read_token_file() -> Dict[str, Tuple[str, datetime]]: + """ + Load whole token file and return its contents as dict token->username + + If token file doesn't exist, create a new empty one. + """ + if not os.path.exists(TOKEN_FILE): + print(f"NOTICE: Certificate access token file ({TOKEN_FILE}) doesn't exist, will create a new empty one.", file=sys.stderr) + _write_token_file({}) + + token_mapping = {} + for line in open(TOKEN_FILE, "r"): + line = line.strip() + if line == "" or line.startswith("#"): + continue + token, username, expiration = line.split(",") + expiration = datetime.strptime(expiration, '%Y-%m-%dT%H:%M:%S') + token_mapping[token] = (username, expiration) + return token_mapping + + +def _write_token_file(token_mapping: Dict[str, Tuple[str, datetime]]): + """Load whole token file and return its contents as dict token->username""" + with open(TOKEN_FILE, "w") as f: + f.write(TOKEN_FILE_HEADER) + for token, (username, expiration) in token_mapping.items(): + f.write(f"{token},{username},{expiration.strftime('%Y-%m-%dT%H:%M:%S')}\n") + diff --git a/config.py b/config.py index 28f6429ef660085f64a8facbd1b4a927c29adcee..24bd560626a6376d63316a50a1e32cc34f34cdfd 100644 --- a/config.py +++ b/config.py @@ -6,6 +6,7 @@ import os.path # *** Configuration of file paths *** SOCTOOLS_BASE = ".." # path to the root of soctools files VARIABLES_FILE = os.path.join(SOCTOOLS_BASE, "group_vars/all/variables.yml") +CA_DIR = os.path.join(SOCTOOLS_BASE, "secrets/CA") CA_CERT_FILE = os.path.join(SOCTOOLS_BASE, "secrets/CA/ca.crt") KEYCLOAK_ADMIN_PASSWORD_FILE = os.path.join(SOCTOOLS_BASE, "secrets/passwords/keykloak_admin") # Note: should be keycloak, not keykloak diff --git a/main.py b/main.py index 891b12927b31b4d6078b2bf1776787638635ff61..4b7b31fbcf3e4222d1039204f3a52b593d6c81a9 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import os.path import re from typing import List, Dict, Optional, Union -from flask import Flask, render_template, request, make_response, redirect, flash +from flask import Flask, render_template, request, make_response, redirect, flash, send_file from flask_wtf import FlaskForm from wtforms import StringField from wtforms.validators import DataRequired, Email @@ -15,6 +15,7 @@ import yaml from dataclasses import dataclass, field # external package - backport of Py3.7 dataclasses to Py3.6 import config +import certificates from nifi import * from misp import * @@ -426,7 +427,56 @@ def delete_user(username: str): return redirect("/") -# TODO certificates?? +@app.route("/export_certificate/") +def export_certificate(): + """ + Show the page allow certificate download, or provide the cert file, if "format" is given. + + Expects two parameters passed via URL: + - "token" (mandatory) - authentication token allowing to access the certificate of the associated user. + - "format" (optional) - which format to download ("p12", "pem-key", "pem-cert"); if not given, show html page to select + """ + # Authentication + token = request.args.get("token") + if not token: + return make_response("ERROR: No token passed", 403) + username = certificates.get_username_by_token(token) + if not username: + return make_response("ERROR: Invalid or expired token", 403) + + # If format is given, export and serve the certificate file + format = request.args.get("format", None) + if format == "p12": + pwd = request.args.get("password", "") + return send_file(certificates.export_p12_certificate(username, pwd), + attachment_filename=f"{username}.p12", mimetype="application/x-pkcs12") + elif format == "pem-cert": + return send_file(certificates.get_pem_files(username)[0], + attachment_filename=f"{username}.crt", mimetype="application/x-pem-file") + elif format == "pem-key": + return send_file(certificates.get_pem_files(username)[1], + attachment_filename=f"{username}.key", mimetype="application/x-pem-file") + # Otherwise show the HTML page + return render_template("export_certificate.html", username=username) # TODO + + +@app.route("/send_token/<username>") +def send_token(username: str): + #TODO + return make_response("TODO") + + +# TODO: +# (re)send cert-access token for existing user +# automatically create certificate when creating new user (optionally automatically send email with token) + + +@app.route("/test_cert/<func>") +def test_cert_endpoint(func): + # run any function from "certificates" module + result = str(getattr(certificates, func)(**request.args)) + return make_response(result) + # TODO other services (besides Keycloak) # - NiFi - DONE diff --git a/templates/main.html b/templates/main.html index c10fa1d81ab563d267b2f08a9501986560c8428b..52613fcdde21b4e24bb3cd0100b01da8e3d831f3 100644 --- a/templates/main.html +++ b/templates/main.html @@ -3,8 +3,11 @@ <p><a href="{{ url_for("add_user") }}">{{ icon('plus-circle', size="1em") }} Add new user ...</a></p> +{# TODO #} +<input type="checkbox" id="show-internal" onclick=""><label for="show-internal">Show internal (service) accounts</label> + <table> -<tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th>NiFi</th><th>MISP</th><th></th> +<tr><th>Username</th><th>First name</th><th>Last name</th><th>Email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th>NiFi</th><th>MISP</th><th>Actions</th> {% for user in users %} <tr{% if user.internal %} class="internal-user"{% endif %}> <td>{{ user.username }}</td> @@ -19,6 +22,8 @@ <td> {% if not user.internal -%} <a href="{{ url_for('edit_user', username=user.username) }}" title="Edit user">{{ icon('pencil') }}</a> +<a href="{{ url_for('send_token', username=user.username) }}" title="Re-send email with token for certificate download" + onclick="return confirm('Send an email to "{{user.email}}" containing a unique URL allowing to download the user\'s certificate and private key?')">{{ icon('envelope') }}</a> <a href="{{ url_for('delete_user', username=user.username) }}" title="Delete user" onclick="return confirm('Are you sure you want to permanently delete user account "{{user.username}}" ({{user.cn}}, {{user.email}})?')">{{ icon('trash') }}</a> {%- endif %}