diff --git a/misp.py b/misp.py new file mode 100644 index 0000000000000000000000000000000000000000..d5b99da4f69b3d6c0b6a0e0e2d1585ba0a316e10 --- /dev/null +++ b/misp.py @@ -0,0 +1,159 @@ +"""Functions to manage user accounts in MISP""" + +from typing import List, Dict, Optional +import requests +from datetime import datetime +import re +from operator import itemgetter +import urllib.parse + +import config + +# Base URL to MISP API endpoints +MISP_API_BASE_URL = "https://{soctools_proxy}:6443" + +# MISP API documentation: https://www.circl.lu/doc/misp/automation/#user-management + +class MISPError(Exception): + pass + +class MISPUserNotFoundError(MISPError): + pass + +class MISPUserExistsError(MISPError): + pass + +class MISPUnexpectedReplyError(MISPError): + pass + + +# ========================= +# Public interface + +def misp_get_users() -> List[Dict]: + """ + List users defined in MISP + + :return List of dicts with keys 'id', 'email', 'org', 'role', 'login', 'created' (datetime), 'last_login' (datetime or None) + :raise MISPUnexpectedReplyError + """ + url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users" + + resp = _send_request("get", url) + if not resp.ok: + raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected reply {resp.status_code}") + users = [] + try: + for user_entry in resp.json(): + created = user_entry["User"]["date_created"] # string unix timestamp (or null for pre-created admin account) + created = datetime.utcfromtimestamp(int(created)) if created else None + last_login = user_entry["User"]["last_login"] # string unix timestamp ("0" means no login yet) + last_login = datetime.utcfromtimestamp(int(last_login)) if last_login and last_login != "0" else None + users.append({ + "id": user_entry["User"]["id"], + "email": user_entry["User"]["email"], + "org": user_entry["Organisation"]["name"], + "role": user_entry["Role"]["name"], + "created": created, # time of account creation (datetime) + "last_login": last_login, + }) + except (ValueError, TypeError, KeyError) as e: + print(f"Can't get list of users from MISP: Unexpected content received: {type(e).__name__}: {e})") + raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected content received") + + return users + + +def misp_add_user(user: 'UserAccount') -> None: + """Add a new user to MISP + + :raise MISPUnexpectedReplyError, MISPUserExistsError + """ + user_email = user.email + user_role_id = 1 # should be "admin", no support for other roles yet + user_org_id = 1 # use the first org, no support for selection yet + + url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/add" + data = { + "email": user_email, + "org_id": user_org_id, + "role_id": user_role_id, + "external_auth_required": "1", + "external_auth_key": user_email, + "change_pw": "0", + } + resp = _send_request("post", url, data) + if not resp.ok: + if "An account with this email address already exists" in resp.text: + raise MISPUserExistsError() + else: + print(f"Can't add user to MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}") + raise MISPUnexpectedReplyError(f"Can't add user to MISP: Unexpected reply {resp.status_code}") + return None + + +def misp_edit_user(old_email, new_email) -> None: + """Edit existing user in MISP (only email can be changed, other params aren't stored in MISP) + + :raise MISPUnexpectedReplyError, MISPUserExistsError + """ + user_id = _get_id_by_email(old_email) # raises MISPUserNotFoundError if user not found + + url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/edit/" + user_id + data = { + "email": new_email, + "external_auth_required": "1", + "external_auth_key": new_email, + "change_pw": "0", + } + resp = _send_request("post", url, data) + if not resp.ok: + if "An account with this email address already exists" in resp.text: + raise MISPUserExistsError() + else: + print(f"Can't edit user in MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}") + raise MISPUnexpectedReplyError(f"Can't edit user in MISP: Unexpected reply {resp.status_code}") + return None + + +def misp_delete_user(user_email: str) -> None: + """Delete a user from MISP + + :raises MISPUnexpectedReplyError, MISPUserNotFoundError + """ + user_id = _get_id_by_email(user_email) # raises MISPUserNotFoundError if user not found + url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/delete/" + user_id + resp = _send_request("post", url) + if not resp.ok: + print(f"Can't delete user from MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}") + raise MISPUnexpectedReplyError(f"Can't delete user from MISP: Unexpected reply {resp.status_code}") + return None + + +# ========================= +# Auxiliary functions + +def _send_request(method:str, url:str, data:Optional[dict]=None): + return getattr(requests, method)( + url, + headers={ + "Authorization": config.MISP_API_KEY, + "Accept": "application/json", + }, + verify=config.CA_CERT_FILE, + json=data + ) + + +def _get_id_by_email(email:str) -> str: + url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users" + resp = _send_request("get", url) + if not resp.ok: + raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected reply {resp.status_code}") + try: + for user_entry in resp.json(): + if user_entry["User"]["email"] == email: + return user_entry["User"]["id"] + raise MISPUserNotFoundError() + except (ValueError, TypeError, KeyError) as e: + raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected content received")