Skip to content
Snippets Groups Projects
Commit 12c0146f authored by Václav Bartoš's avatar Václav Bartoš
Browse files

added a page allowing to download a user certificate

parent 02bcbd83
No related branches found
No related tags found
No related merge requests found
...@@ -22,9 +22,9 @@ MGMT_USER_KEY_PATH = os.path.join(SOCTOOLS_BASE, "secrets/CA/private/soctools-us ...@@ -22,9 +22,9 @@ MGMT_USER_KEY_PATH = os.path.join(SOCTOOLS_BASE, "secrets/CA/private/soctools-us
# MGMT_USER_CERT_PATH = os.path.join(SOCTOOLS_BASE, "secrets/certificates/SOC_Admin.crt.pem") # MGMT_USER_CERT_PATH = os.path.join(SOCTOOLS_BASE, "secrets/certificates/SOC_Admin.crt.pem")
# MGMT_USER_KEY_PATH = os.path.join(SOCTOOLS_BASE, "secrets/certificates/SOC_Admin.key.pem") # MGMT_USER_KEY_PATH = os.path.join(SOCTOOLS_BASE, "secrets/certificates/SOC_Admin.key.pem")
# Following parameters are set up dynamically by load_config() in main.py
# Following parameters are set up dynamically by load_config() in main.py
SOCTOOLSPROXY = None SOCTOOLSPROXY = None
USER_MGMT_BASE_URL = None
KEYCLOAK_BASE_URL = None KEYCLOAK_BASE_URL = None
KEYCLOAK_USERS_URL = None KEYCLOAK_USERS_URL = None
......
...@@ -30,6 +30,9 @@ def load_config(): ...@@ -30,6 +30,9 @@ def load_config():
# Get FQDN of the main server # Get FQDN of the main server
config.SOCTOOLSPROXY = variables["soctoolsproxy"] config.SOCTOOLSPROXY = variables["soctoolsproxy"]
assert re.match('[a-zA-Z0-9.-]+', config.SOCTOOLSPROXY), f"ERROR: The 'soctoolsproxy' variable loaded from '{config.VARIABLES_FILE}' is not a valid domain name." assert re.match('[a-zA-Z0-9.-]+', config.SOCTOOLSPROXY), f"ERROR: The 'soctoolsproxy' variable loaded from '{config.VARIABLES_FILE}' is not a valid domain name."
# Set base URL for user management (this web service)
# TODO: load ports (or whole base URLs) from config as well
config.USER_MGMT_BASE_URL = f"http://{config.SOCTOOLSPROXY}:8080"
# Set base URL to Keycloak # Set base URL to Keycloak
config.KEYCLOAK_BASE_URL = f"https://{config.SOCTOOLSPROXY}:12443" config.KEYCLOAK_BASE_URL = f"https://{config.SOCTOOLSPROXY}:12443"
config.KEYCLOAK_USERS_URL = config.KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users" config.KEYCLOAK_USERS_URL = config.KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users"
...@@ -82,7 +85,7 @@ class UserAccount: ...@@ -82,7 +85,7 @@ class UserAccount:
"lastName": self.lastname, "lastName": self.lastname,
"email": self.email, "email": self.email,
"attributes": { "attributes": {
"CN": [self.cn], "CN": [self.cn], # TODO: should be equal to username
"DN": [f"CN={self.cn}"] "DN": [f"CN={self.cn}"]
}, },
} }
...@@ -234,9 +237,6 @@ def kc_delete_user(userid: str) -> None: ...@@ -234,9 +237,6 @@ def kc_delete_user(userid: str) -> None:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
# *** NiFi ***
# *** Flask endpoints and forms *** # *** Flask endpoints and forms ***
class AddUserForm(FlaskForm): class AddUserForm(FlaskForm):
...@@ -305,6 +305,18 @@ def add_user(): ...@@ -305,6 +305,18 @@ def add_user():
lastname=form_user.lastname.data, lastname=form_user.lastname.data,
cn=form_user.cn.data, cn=form_user.cn.data,
dn=f"CN={form_user.cn.data}") dn=f"CN={form_user.cn.data}")
# Generate certificate
try:
certificates.generate_certificate(user.cn)
flash(f'Certificate for user "{user.username}" was successfully created.', "success")
#token = certificates.generate_access_token(user.cn)
#print(f"Certificate access token for '{user.cn}': {token}")
# TODO: send token to email automatically?
except certificates.CertError as e:
flash(str(e), "error")
return redirect("/") # don't continue creating user accounts in services
# Keycloak # Keycloak
try: try:
kc_add_user(user) kc_add_user(user)
...@@ -399,6 +411,8 @@ def delete_user(username: str): ...@@ -399,6 +411,8 @@ def delete_user(username: str):
flash(f"Error: Can't get user info from KeyCloak: {e}", "error") flash(f"Error: Can't get user info from KeyCloak: {e}", "error")
return redirect("/") return redirect("/")
# TODO revoke certificate
# Keycloak # Keycloak
try: try:
kc_delete_user(user_spec.kcid) kc_delete_user(user_spec.kcid)
...@@ -427,10 +441,10 @@ def delete_user(username: str): ...@@ -427,10 +441,10 @@ def delete_user(username: str):
return redirect("/") return redirect("/")
@app.route("/export_certificate/") @app.route("/export_certificate")
def export_certificate(): def export_certificate():
""" """
Show the page allow certificate download, or provide the cert file, if "format" is given. Show the page allowing certificate download, or provide the cert file, if "format" is given.
Expects two parameters passed via URL: Expects two parameters passed via URL:
- "token" (mandatory) - authentication token allowing to access the certificate of the associated user. - "token" (mandatory) - authentication token allowing to access the certificate of the associated user.
...@@ -444,44 +458,74 @@ def export_certificate(): ...@@ -444,44 +458,74 @@ def export_certificate():
if not username: if not username:
return make_response("ERROR: Invalid or expired token", 403) return make_response("ERROR: Invalid or expired token", 403)
user_spec = kc_get_user_by_name(username)
if user_spec is None:
flash(f"ERROR: No such user ('{username}')", "error")
return redirect("/")
# If format is given, export and serve the certificate file # If format is given, export and serve the certificate file
format = request.args.get("format", None) format = request.args.get("format", None)
if format == "p12": if format == "p12":
pwd = request.args.get("password", "") pwd = request.args.get("password", "")
return send_file(certificates.export_p12_certificate(username, pwd), return send_file(certificates.export_p12_certificate(user_spec.cn, pwd),
attachment_filename=f"{username}.p12", mimetype="application/x-pkcs12") attachment_filename=f"{user_spec.cn}.p12", mimetype="application/x-pkcs12")
elif format == "pem-cert": elif format == "pem-cert":
return send_file(certificates.get_pem_files(username)[0], return send_file(certificates.get_pem_files(user_spec.cn)[0],
attachment_filename=f"{username}.crt", mimetype="application/x-pem-file") attachment_filename=f"{user_spec.cn}.crt", mimetype="application/x-pem-file")
elif format == "pem-key": elif format == "pem-key":
return send_file(certificates.get_pem_files(username)[1], return send_file(certificates.get_pem_files(user_spec.cn)[1],
attachment_filename=f"{username}.key", mimetype="application/x-pem-file") attachment_filename=f"{user_spec.cn}.key", mimetype="application/x-pem-file")
# Otherwise show the HTML page # Otherwise show the HTML page
return render_template("export_certificate.html", username=username) # TODO return render_template("export_certificate.html", token=token, username=username)
@app.route("/send_token/<username>") @app.route("/send_token/<username>")
def send_token(username: str): def send_token(username: str):
#TODO """
return make_response("TODO") Send an email with certificate access token to the user.
"""
# Check that the user exists
user_spec = kc_get_user_by_name(username)
if user_spec is None:
flash(f"ERROR: No such user ('{username}')", "error")
return redirect("/")
# Generate token
try:
token = certificates.generate_access_token(username)
except Exception as e:
flash(f"ERROR: {e}", "error")
return redirect("/")
access_url = f"{config.USER_MGMT_BASE_URL}/export_certificate?token={token}"
print(f"Certificate access URL for '{username}': {access_url}")
# Send the token via email
# TODO
flash(f"Email successfully sent to '{user_spec.email}'", "success")
return redirect("/")
# TODO: # TODO:
# (re)send cert-access token for existing user # (re)send cert-access token for existing user - DONE (on click in table)
# automatically create certificate when creating new user (optionally automatically send email with token) # automatically create certificate when creating new user (optionally automatically send email with token) - DONE
# revoke and delete certificate when user is deleted
# make CN=username (co cert filename also matches the username (it's stored by CN))
@app.route("/test_cert/<func>") # @app.route("/test_cert/<func>")
def test_cert_endpoint(func): # def test_cert_endpoint(func):
# run any function from "certificates" module # # run any function from "certificates" module
result = str(getattr(certificates, func)(**request.args)) # result = str(getattr(certificates, func)(**request.args))
return make_response(result) # return make_response(result)
# TODO other services (besides Keycloak) # TODO other services (besides Keycloak)
# - NiFi - DONE # - NiFi - DONE
# - MISP - DONE # - MISP - DONE
# - Kibana? # - Kibana? - account doesn't need to be added, but it needs to add privileges
# - TheHive + Cortex # - TheHive + Cortex
# TODO authentication/authorization to this GUI # TODO authentication/authorization to this GUI
......
{% extends "base.html" %}
{% block body %}
<p>The certificate for user '{{ username }}', which allows to access various SOCtools services,
can be downloaded in the following formats:</p>
{# TODO password field/prompt #}
<p><a href="{{ url_for('export_certificate') }}?token={{token}}&amp;format=p12">PKCS12 (.p12)</a> - contains both certificate and matching private key &nbsp; <span style="font-style: italics; color: #777;">← You probably need this to import into your browser</span></p>
<p><a href="{{ url_for('export_certificate') }}?token={{token}}&amp;format=pem-cert">PEM (certificate) (.crt)</a></p>
<p><a href="{{ url_for('export_certificate') }}?token={{token}}&amp;format=pem-key">PEM (private key) (.key)</a></p>
{% endblock %}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment