"""Functions to manage user accounts in MISP""" from typing import List, Dict, Optional import requests from datetime import datetime from config import config # Base URL to MISP API endpoints MISP_API_BASE_URL = "https://{soctools_proxy}:6443" # MISP API documentation: https://www.circl.lu/doc/misp/automation/#user-management class MISPError(Exception): pass class MISPUserNotFoundError(MISPError): pass class MISPUserExistsError(MISPError): pass class MISPUnexpectedReplyError(MISPError): pass # ========================= # Public interface def misp_get_users() -> List[Dict]: """ List users defined in MISP :return List of dicts with keys 'id', 'email', 'org', 'role', 'login', 'created' (datetime), 'last_login' (datetime or None) :raise MISPUnexpectedReplyError """ url = MISP_API_BASE_URL.format(soctools_proxy=config['soctoolsproxy']) + "/admin/users" resp = _send_request("get", url) if not resp.ok: raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected reply {resp.status_code}") users = [] try: for user_entry in resp.json(): created = user_entry["User"]["date_created"] # string unix timestamp (or null for pre-created admin account) created = datetime.utcfromtimestamp(int(created)) if created else None last_login = user_entry["User"]["last_login"] # string unix timestamp ("0" means no login yet) last_login = datetime.utcfromtimestamp(int(last_login)) if last_login and last_login != "0" else None users.append({ "id": user_entry["User"]["id"], "email": user_entry["User"]["email"], "org": user_entry["Organisation"]["name"], "role": user_entry["Role"]["name"], "created": created, # time of account creation (datetime) "last_login": last_login, }) except (ValueError, TypeError, KeyError) as e: print(f"Can't get list of users from MISP: Unexpected content received: {type(e).__name__}: {e})") raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected content received") return users def misp_add_user(user: 'UserAccount') -> None: """Add a new user to MISP :raise MISPUnexpectedReplyError, MISPUserExistsError """ user_email = user.email user_role_id = 1 # should be "admin", no support for other roles yet user_org_id = 1 # use the first org, no support for selection yet url = MISP_API_BASE_URL.format(soctools_proxy=config['soctoolsproxy']) + "/admin/users/add" data = { "email": user_email, "org_id": user_org_id, "role_id": user_role_id, "external_auth_required": "1", "external_auth_key": user_email, "change_pw": "0", } resp = _send_request("post", url, data) if not resp.ok: if "An account with this email address already exists" in resp.text: raise MISPUserExistsError() else: print(f"Can't add user to MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}") raise MISPUnexpectedReplyError(f"Can't add user to MISP: Unexpected reply {resp.status_code}") return None def misp_edit_user(old_email, new_email) -> None: """Edit existing user in MISP (only email can be changed, other params aren't stored in MISP) :raise MISPUnexpectedReplyError, MISPUserExistsError """ user_id = _get_id_by_email(old_email) # raises MISPUserNotFoundError if user not found url = MISP_API_BASE_URL.format(soctools_proxy=config['soctoolsproxy']) + "/admin/users/edit/" + user_id data = { "email": new_email, "external_auth_required": "1", "external_auth_key": new_email, "change_pw": "0", } resp = _send_request("post", url, data) if not resp.ok: if "An account with this email address already exists" in resp.text: raise MISPUserExistsError() else: print(f"Can't edit user in MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}") raise MISPUnexpectedReplyError(f"Can't edit user in MISP: Unexpected reply {resp.status_code}") return None def misp_delete_user(user_email: str) -> None: """Delete a user from MISP :raises MISPUnexpectedReplyError, MISPUserNotFoundError """ user_id = _get_id_by_email(user_email) # raises MISPUserNotFoundError if user not found url = MISP_API_BASE_URL.format(soctools_proxy=config['soctoolsproxy']) + "/admin/users/delete/" + user_id resp = _send_request("post", url) if not resp.ok: print(f"Can't delete user from MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}") raise MISPUnexpectedReplyError(f"Can't delete user from MISP: Unexpected reply {resp.status_code}") return None # ========================= # Auxiliary functions def _send_request(method:str, url:str, data:Optional[dict]=None): try: api_key = open(config['misp_api_key_file'], 'r').read(100).strip() # read max 100 B, the key should never be so long except IOError as e: raise IOError(f"Can't load MISP API key from file: {e}") return getattr(requests, method)( url, headers={ "Authorization": api_key, "Accept": "application/json", }, verify=config['ca_cert_file'], json=data ) def _get_id_by_email(email:str) -> str: url = MISP_API_BASE_URL.format(soctools_proxy=config['soctoolsproxy']) + "/admin/users" resp = _send_request("get", url) if not resp.ok: raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected reply {resp.status_code}") try: for user_entry in resp.json(): if user_entry["User"]["email"] == email: return user_entry["User"]["id"] raise MISPUserNotFoundError() except (ValueError, TypeError, KeyError) as e: raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected content received")