From 02bcbd8332e0609138010c387513d2dfb14f3748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Barto=C5=A1?= <bartos@cesnet.cz> Date: Sun, 24 Jul 2022 19:05:51 +0200 Subject: [PATCH] Initial support for certificate management (incomplete) Most of the functions needed to generate certificates and access them using unique tokens are implemented. Integration with the rest of the system, as well as sending emails, is still missing. --- certificates.py | 214 ++++++++++++++++++++++++++++++++++++++++++++ config.py | 1 + main.py | 54 ++++++++++- templates/main.html | 7 +- 4 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 certificates.py diff --git a/certificates.py b/certificates.py new file mode 100644 index 0000000..818fad4 --- /dev/null +++ b/certificates.py @@ -0,0 +1,214 @@ +""" +Functions to manage user certificates + +User authentication within SOCtools is mandated by x509 certificates issued by an internal SOCtools CA. +The CA is based on the Easy-RSA tool which is used to manage everything around the certificates +(all its config and data is in "<SOCTOOLS_BASE>/secrets/CA/"). + +When a new user is created, a private key and a certificate with his/her username as CN are generated by Easy-RSA +in the background (stored into ".../CA/issued/<username>.crt"). # TODO check +The user is emailed with an URL containing a unique token allowing him/her to export and download the certificate and +private key in the .p12 format. # TODO other formats? + +The linkage of these tokens to usernames is stored in a local file at: "<SOCTOOLS_BASE>/secrets/cert_access_tokens" +Format: "token,username,expiration-time" per line +""" +import io +from typing import List, Dict, Tuple, Optional +from typing.io import BinaryIO +import sys +import os.path +import subprocess +import random +import string +from datetime import datetime, timedelta + + +import config + +# Path to 'easyrsa' executable +#TODO: don't use the one from repository, should be installed somewhere +EASYRSA = os.path.join(config.SOCTOOLS_BASE, "roles/ca/files/easyrsa/easyrsa") + +# Environment variables to pass to easyrsa +EASYRSA_ENV = { + "EASYRSA_BATCH": "1", + "EASYRSA_PKI": config.CA_DIR, +} + +TOKEN_FILE = os.path.join(config.SOCTOOLS_BASE, "secrets/cert_access_tokens") +TOKEN_FILE_HEADER = """ +# Unique tokens allowing users to download the certificates generated for them via the user-mgmt-ui. +# !! AUTOMATICALLY GENERATED, DON'T EDIT !! (unless you know what you are doing) +""".lstrip() +TOKEN_EXPIRATION_HOURS = 24 + +EMAIL_SUBJECT = "[SOCtools] New account for {name}" +EMAIL_TEMPLATE = """ +Dear {name}, + +a user account has been created for you in the SOCtools system running at {base_url}. + +Your username: {username} + +You can authenticate to the system using a personal certificate which has been created for you. +Please, download the certificate from the following link and install it into your browser: + +{access_url} + +Don't share the link with anyone! It allows to download your certificate, including the private key. + +The link will expire in {} hours. +""" + + +class CertError(Exception): + pass + + +# ========================= +# Public interface + +def generate_certificate(cn: str): + """Generate a new x509 certificate for given user. + + The certificate and associated private key are stored as file in the Easy-RSA directory: + - cert (pem): <CA_DIR>/issued/<cn>.crt + - key (pem): <CA_DIR>/private/<cn>.key + - both (p12): <CA_DIR>/private/<cn>.p12 + + :param cn: CN (common name) to fill in the certificate + """ + _check_cn(cn) + # Create new key+cert (stored in PEM format) + cmd = [EASYRSA, "build-client-full", cn, "nopass"] + print(f"Running command: {' '.join(cmd)}") + result = subprocess.run(cmd, env=EASYRSA_ENV, stderr=subprocess.PIPE, encoding="ascii", errors="backslashreplace") + if result.returncode != 0: + raise CertError(f"Can't create a certificate for '{cn}': {result.stderr[:500]}") + + +# TODO: do the same for PEM? It would allow to download encrypted PEM-formatted key, but otherwise it's not needed +def export_p12_certificate(cn: str, password: str="") -> BinaryIO: + """ + Export user's certificate+key into PKCS12 (.p12) file encrypted by given password + + :param cn: CN (common name) identifying the certificate + :param password: Password to encrypt the file + :return: Path to temporary .p12 file, should be removed by caller after use + """ + _check_cn(cn) + # Export cert+key to a temporary file (using openssl, since easy-rsa doesn't allow to pass password on command line) + out_file = os.path.join(config.CA_DIR, f"~tmp.{cn}.p12") + crt_file, key_file = get_pem_files(cn) + cmd = ["openssl", "pkcs12", "-export", "-out", out_file, "-in", crt_file, "-inkey", key_file, "-passout", "pass:"+password] + print(f"Running command: {' '.join(cmd)}") + result = subprocess.run(cmd, stderr=subprocess.PIPE, encoding="ascii", errors="backslashreplace") + if result.returncode != 0: + raise CertError(f"Can't export p12 certificate for '{cn}': {result.stderr[:500]}") + # Open the file, delete it (i.e. remove filesystem entry) and return the opened file-like object (the file is still + # available until closed; it should be closed automatically when no variable points to it) + out_file_opened = open(out_file, "rb") + os.remove(out_file) + return out_file_opened + + +def revoke_certificate(cn: str): + """Revoke a previously issued x509 certificate. + + :param cn: CN (common name) identifying the certificate + """ + _check_cn(cn) + raise NotImplementedError + # cmd = [{EASYRSA}, "TODO", cn] + # result = subprocess.run(cmd, env=EASYRSA_ENV, stderr=subprocess.PIPE) + # if result.returncode != 0: + # raise CertError(f"Can't revoke the certificate for '{cn}': {result.stderr[:500]}") + + +def get_pem_files(cn: str): + """Return path to the certificate (.crt) and key (.key) files of given user + + :param cn: CN (common name) identifying the certificate + :return: Tuple with *.cert and *.key files in PEM format + """ + _check_cn(cn) + return (os.path.join(config.CA_DIR, "issued", cn+".crt"), + os.path.join(config.CA_DIR, "private", cn+".key")) + + +def generate_access_token(cn: str) -> str: + """ + Generate and store a new access token (16 random chars) allowing the user to download his/her certificate. + + :param cn: username (CN from certificate) + :return the generated token + """ + token_mapping = _read_token_file() + # Generate 16 random characters (letters and numbers) + new_token = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16)) + # Check that the token doesn't exit, yet (just from principle, the chance is extremely small) + if new_token in token_mapping: + raise RuntimeError("Generated a random token that already exists! This should only happen once in 4,7e28" + "trials on average - either you're impossibly (un)lucky or something's wrong.") + # Set expiration time + exp_time = datetime.utcnow() + timedelta(hours=TOKEN_EXPIRATION_HOURS) + # Add the new token, username and exp. time to the list and write it to the file + token_mapping[new_token] = (cn, exp_time) + _write_token_file(token_mapping) + # Return the token + return new_token + + +def get_username_by_token(token: str) -> Optional[str]: + """Return username for which given token was generated (or None if unknown token)""" + token_mapping = _read_token_file() + if token not in token_mapping: + return None # non-existent token + username, exp = token_mapping.get(token) + if datetime.utcnow() > exp: + return None # expired token + return username + + +# ========================= +# Auxiliary functions + +def _check_cn(cn): + """ + Check CN validity - it must be possible to store it as a filename, without changing directory + + :raise ValueError + """ + if (".." in cn or "/" in cn or # this would allow to access files outside the CA directory + cn == "" or len(cn) > 64): # don't allow empty or too long usernames + raise ValueError("Invalid username/common_name") + +def _read_token_file() -> Dict[str, Tuple[str, datetime]]: + """ + Load whole token file and return its contents as dict token->username + + If token file doesn't exist, create a new empty one. + """ + if not os.path.exists(TOKEN_FILE): + print(f"NOTICE: Certificate access token file ({TOKEN_FILE}) doesn't exist, will create a new empty one.", file=sys.stderr) + _write_token_file({}) + + token_mapping = {} + for line in open(TOKEN_FILE, "r"): + line = line.strip() + if line == "" or line.startswith("#"): + continue + token, username, expiration = line.split(",") + expiration = datetime.strptime(expiration, '%Y-%m-%dT%H:%M:%S') + token_mapping[token] = (username, expiration) + return token_mapping + + +def _write_token_file(token_mapping: Dict[str, Tuple[str, datetime]]): + """Load whole token file and return its contents as dict token->username""" + with open(TOKEN_FILE, "w") as f: + f.write(TOKEN_FILE_HEADER) + for token, (username, expiration) in token_mapping.items(): + f.write(f"{token},{username},{expiration.strftime('%Y-%m-%dT%H:%M:%S')}\n") + diff --git a/config.py b/config.py index 28f6429..24bd560 100644 --- a/config.py +++ b/config.py @@ -6,6 +6,7 @@ import os.path # *** Configuration of file paths *** SOCTOOLS_BASE = ".." # path to the root of soctools files VARIABLES_FILE = os.path.join(SOCTOOLS_BASE, "group_vars/all/variables.yml") +CA_DIR = os.path.join(SOCTOOLS_BASE, "secrets/CA") CA_CERT_FILE = os.path.join(SOCTOOLS_BASE, "secrets/CA/ca.crt") KEYCLOAK_ADMIN_PASSWORD_FILE = os.path.join(SOCTOOLS_BASE, "secrets/passwords/keykloak_admin") # Note: should be keycloak, not keykloak diff --git a/main.py b/main.py index 891b129..4b7b31f 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import os.path import re from typing import List, Dict, Optional, Union -from flask import Flask, render_template, request, make_response, redirect, flash +from flask import Flask, render_template, request, make_response, redirect, flash, send_file from flask_wtf import FlaskForm from wtforms import StringField from wtforms.validators import DataRequired, Email @@ -15,6 +15,7 @@ import yaml from dataclasses import dataclass, field # external package - backport of Py3.7 dataclasses to Py3.6 import config +import certificates from nifi import * from misp import * @@ -426,7 +427,56 @@ def delete_user(username: str): return redirect("/") -# TODO certificates?? +@app.route("/export_certificate/") +def export_certificate(): + """ + Show the page allow certificate download, or provide the cert file, if "format" is given. + + Expects two parameters passed via URL: + - "token" (mandatory) - authentication token allowing to access the certificate of the associated user. + - "format" (optional) - which format to download ("p12", "pem-key", "pem-cert"); if not given, show html page to select + """ + # Authentication + token = request.args.get("token") + if not token: + return make_response("ERROR: No token passed", 403) + username = certificates.get_username_by_token(token) + if not username: + return make_response("ERROR: Invalid or expired token", 403) + + # If format is given, export and serve the certificate file + format = request.args.get("format", None) + if format == "p12": + pwd = request.args.get("password", "") + return send_file(certificates.export_p12_certificate(username, pwd), + attachment_filename=f"{username}.p12", mimetype="application/x-pkcs12") + elif format == "pem-cert": + return send_file(certificates.get_pem_files(username)[0], + attachment_filename=f"{username}.crt", mimetype="application/x-pem-file") + elif format == "pem-key": + return send_file(certificates.get_pem_files(username)[1], + attachment_filename=f"{username}.key", mimetype="application/x-pem-file") + # Otherwise show the HTML page + return render_template("export_certificate.html", username=username) # TODO + + +@app.route("/send_token/<username>") +def send_token(username: str): + #TODO + return make_response("TODO") + + +# TODO: +# (re)send cert-access token for existing user +# automatically create certificate when creating new user (optionally automatically send email with token) + + +@app.route("/test_cert/<func>") +def test_cert_endpoint(func): + # run any function from "certificates" module + result = str(getattr(certificates, func)(**request.args)) + return make_response(result) + # TODO other services (besides Keycloak) # - NiFi - DONE diff --git a/templates/main.html b/templates/main.html index c10fa1d..52613fc 100644 --- a/templates/main.html +++ b/templates/main.html @@ -3,8 +3,11 @@ <p><a href="{{ url_for("add_user") }}">{{ icon('plus-circle', size="1em") }} Add new user ...</a></p> +{# TODO #} +<input type="checkbox" id="show-internal" onclick=""><label for="show-internal">Show internal (service) accounts</label> + <table> -<tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th>NiFi</th><th>MISP</th><th></th> +<tr><th>Username</th><th>First name</th><th>Last name</th><th>Email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th>NiFi</th><th>MISP</th><th>Actions</th> {% for user in users %} <tr{% if user.internal %} class="internal-user"{% endif %}> <td>{{ user.username }}</td> @@ -19,6 +22,8 @@ <td> {% if not user.internal -%} <a href="{{ url_for('edit_user', username=user.username) }}" title="Edit user">{{ icon('pencil') }}</a> +<a href="{{ url_for('send_token', username=user.username) }}" title="Re-send email with token for certificate download" + onclick="return confirm('Send an email to "{{user.email}}" containing a unique URL allowing to download the user\'s certificate and private key?')">{{ icon('envelope') }}</a> <a href="{{ url_for('delete_user', username=user.username) }}" title="Delete user" onclick="return confirm('Are you sure you want to permanently delete user account "{{user.username}}" ({{user.cn}}, {{user.email}})?')">{{ icon('trash') }}</a> {%- endif %} -- GitLab