-
Václav Bartoš authored
There's no reason to make them different and this way it simplifies things (for example EasyRSA names certificate files by CN, but we usually identify user by its username).
Václav Bartoš authoredThere's no reason to make them different and this way it simplifies things (for example EasyRSA names certificate files by CN, but we usually identify user by its username).
certificates.py 8.26 KiB
"""
Functions to manage user certificates
User authentication within SOCtools is mandated by x509 certificates issued by an internal SOCtools CA.
The CA is based on the Easy-RSA tool which is used to manage everything around the certificates
(all its config and data is in "<SOCTOOLS_BASE>/secrets/CA/").
When a new user is created, a private key and a certificate with his/her username as CN are generated by Easy-RSA
in the background (stored into ".../CA/issued/<CN>.crt").
The user is emailed with an URL containing a unique token allowing him/her to export and download the certificate and
private key in the PKCS12 (.p12) and PEM (.key, .crt) formats.
The linkage of these tokens to usernames is stored in a local file at: "<SOCTOOLS_BASE>/secrets/cert_access_tokens"
Format: "token,username,expiration-time" per line
"""
import io
from typing import List, Dict, Tuple, Optional
from typing.io import BinaryIO
import sys
import os.path
import subprocess
import random
import string
from datetime import datetime, timedelta
import config
# Path to 'easyrsa' executable
#TODO: don't use the one from repository, should be installed somewhere
EASYRSA = os.path.join(config.SOCTOOLS_BASE, "roles/ca/files/easyrsa/easyrsa")
# Environment variables to pass to easyrsa
EASYRSA_ENV = {
"EASYRSA_BATCH": "1",
"EASYRSA_PKI": config.CA_DIR,
}
TOKEN_FILE = os.path.join(config.SOCTOOLS_BASE, "secrets/cert_access_tokens")
TOKEN_FILE_HEADER = """
# Unique tokens allowing users to download the certificates generated for them via the user-mgmt-ui.
# !! AUTOMATICALLY GENERATED, DON'T EDIT !! (unless you know what you are doing)
""".lstrip()
TOKEN_EXPIRATION_HOURS = 24
EMAIL_SUBJECT = "[SOCtools] New account for {name}"
EMAIL_TEMPLATE = """
Dear {name},
a user account has been created for you in the SOCtools system running at {base_url}.
Your username: {username}
You can authenticate to the system using a personal certificate which has been created for you.
Please, download the certificate from the following link and install it into your browser:
{access_url}
Don't share the link with anyone! It allows to download your certificate, including the private key.
The link will expire in {} hours.
"""
class CertError(Exception):
pass
# =========================
# Public interface
def generate_certificate(cn: str):
"""Generate a new x509 certificate for given user.
The certificate and associated private key are stored as files in the Easy-RSA directory:
- cert (pem): <CA_DIR>/issued/<cn>.crt
- key (pem): <CA_DIR>/private/<cn>.key
- both (p12): <CA_DIR>/private/<cn>.p12
:param cn: CN (common name) to fill in the certificate
"""
_check_cn(cn)
# Create new key+cert (stored in PEM format)
cmd = [EASYRSA, "build-client-full", cn, "nopass"]
print(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, env=EASYRSA_ENV, stderr=subprocess.PIPE, encoding="ascii", errors="backslashreplace")
if result.returncode != 0:
raise CertError(f"Can't create a certificate for '{cn}': {result.stderr[:500]}")
# TODO: do the same for PEM? It would allow to download encrypted PEM-formatted key, but otherwise it's not needed
def export_p12_certificate(cn: str, password: str="") -> BinaryIO:
"""
Export user's certificate+key into PKCS12 (.p12) file encrypted by given password
:param cn: CN (common name) identifying the certificate
:param password: Password to encrypt the file
:return: Path to temporary .p12 file, should be removed by caller after use
"""
_check_cn(cn)
# Export cert+key to a temporary file (using openssl, since easy-rsa doesn't allow to pass password on command line)
out_file = os.path.join(config.CA_DIR, f"~tmp.{cn}.p12")
crt_file, key_file = get_pem_files(cn)
cmd = ["openssl", "pkcs12", "-export", "-out", out_file, "-in", crt_file, "-inkey", key_file, "-passout", "pass:"+password]
print(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, stderr=subprocess.PIPE, encoding="ascii", errors="backslashreplace")
if result.returncode != 0:
raise CertError(f"Can't export p12 certificate for '{cn}': {result.stderr[:500]}")
# Open the file, delete it (i.e. remove filesystem entry) and return the opened file-like object (the file is still
# available until closed; it should be closed automatically when no variable points to it)
out_file_opened = open(out_file, "rb")
os.remove(out_file)
return out_file_opened
def revoke_certificate(cn: str):
"""Revoke a previously issued x509 certificate.
:param cn: CN (common name) identifying the certificate
"""
_check_cn(cn)
raise NotImplementedError
# cmd = [{EASYRSA}, "TODO", cn]
# result = subprocess.run(cmd, env=EASYRSA_ENV, stderr=subprocess.PIPE)
# if result.returncode != 0:
# raise CertError(f"Can't revoke the certificate for '{cn}': {result.stderr[:500]}")
def get_pem_files(cn: str):
"""Return path to the certificate (.crt) and key (.key) files of given user
:param cn: CN (common name) identifying the certificate
:return: Tuple with *.cert and *.key files in PEM format
"""
_check_cn(cn)
return (os.path.join(config.CA_DIR, "issued", cn+".crt"),
os.path.join(config.CA_DIR, "private", cn+".key"))
def generate_access_token(cn: str) -> str:
"""
Generate and store a new access token (16 random chars) allowing the user to download his/her certificate.
:param cn: username (CN from certificate)
:return the generated token
"""
token_mapping = _read_token_file()
# Generate 16 random characters (letters and numbers)
new_token = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
# Check that the token doesn't exit, yet (just from principle, the chance is extremely small)
if new_token in token_mapping:
raise RuntimeError("Generated a random token that already exists! This should only happen once in 4,7e28"
"trials on average - either you're impossibly (un)lucky or something's wrong.")
# Set expiration time
exp_time = datetime.utcnow() + timedelta(hours=TOKEN_EXPIRATION_HOURS)
# Add the new token, username and exp. time to the list and write it to the file
token_mapping[new_token] = (cn, exp_time)
_write_token_file(token_mapping)
# Return the token
return new_token
def get_username_by_token(token: str) -> Optional[str]:
"""Return username for which given token was generated (or None if unknown token)"""
token_mapping = _read_token_file()
if token not in token_mapping:
return None # non-existent token
username, exp = token_mapping.get(token)
if datetime.utcnow() > exp:
return None # expired token
return username
# =========================
# Auxiliary functions
def _check_cn(cn: str):
"""
Check CN validity - it must be possible to store it as a filename, without changing directory
:raise ValueError
"""
if (".." in cn or "/" in cn or # this would allow to access files outside the CA directory
cn == "" or len(cn) > 64): # don't allow empty or too long usernames
raise ValueError("Invalid username/common_name")
def _read_token_file() -> Dict[str, Tuple[str, datetime]]:
"""
Load whole token file and return its contents as dict token->username
If token file doesn't exist, create a new empty one.
"""
if not os.path.exists(TOKEN_FILE):
print(f"NOTICE: Certificate access token file ({TOKEN_FILE}) doesn't exist, will create a new empty one.", file=sys.stderr)
_write_token_file({})
token_mapping = {}
for line in open(TOKEN_FILE, "r"):
line = line.strip()
if line == "" or line.startswith("#"):
continue
token, username, expiration = line.split(",")
expiration = datetime.strptime(expiration, '%Y-%m-%dT%H:%M:%S')
token_mapping[token] = (username, expiration)
return token_mapping
def _write_token_file(token_mapping: Dict[str, Tuple[str, datetime]]):
"""Load whole token file and return its contents as dict token->username"""
with open(TOKEN_FILE, "w") as f:
f.write(TOKEN_FILE_HEADER)
for token, (username, expiration) in token_mapping.items():
f.write(f"{token},{username},{expiration.strftime('%Y-%m-%dT%H:%M:%S')}\n")