From c993bf3cfcecb7f648c36c75b2745f3efca1fbc0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?V=C3=A1clav=20Barto=C5=A1?= <bartos@cesnet.cz>
Date: Sun, 24 Jul 2022 19:02:17 +0200
Subject: [PATCH] added forgotten misp.py

---
 misp.py | 159 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 159 insertions(+)
 create mode 100644 misp.py

diff --git a/misp.py b/misp.py
new file mode 100644
index 0000000..d5b99da
--- /dev/null
+++ b/misp.py
@@ -0,0 +1,159 @@
+"""Functions to manage user accounts in MISP"""
+
+from typing import List, Dict, Optional
+import requests
+from datetime import datetime
+import re
+from operator import itemgetter
+import urllib.parse
+
+import config
+
+# Base URL to MISP API endpoints
+MISP_API_BASE_URL = "https://{soctools_proxy}:6443"
+
+# MISP API documentation: https://www.circl.lu/doc/misp/automation/#user-management
+
+class MISPError(Exception):
+    pass
+
+class MISPUserNotFoundError(MISPError):
+    pass
+
+class MISPUserExistsError(MISPError):
+    pass
+
+class MISPUnexpectedReplyError(MISPError):
+    pass
+
+
+# =========================
+# Public interface
+
+def misp_get_users() -> List[Dict]:
+    """
+    List users defined in MISP
+
+    :return List of dicts with keys 'id', 'email', 'org', 'role', 'login', 'created' (datetime), 'last_login' (datetime or None)
+    :raise MISPUnexpectedReplyError
+    """
+    url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users"
+
+    resp = _send_request("get", url)
+    if not resp.ok:
+        raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected reply {resp.status_code}")
+    users = []
+    try:
+        for user_entry in resp.json():
+            created = user_entry["User"]["date_created"] # string unix timestamp (or null for pre-created admin account)
+            created = datetime.utcfromtimestamp(int(created)) if created else None
+            last_login = user_entry["User"]["last_login"] # string unix timestamp ("0" means no login yet)
+            last_login = datetime.utcfromtimestamp(int(last_login)) if last_login and last_login != "0" else None
+            users.append({
+                "id": user_entry["User"]["id"],
+                "email": user_entry["User"]["email"],
+                "org": user_entry["Organisation"]["name"],
+                "role": user_entry["Role"]["name"],
+                "created": created, # time of account creation (datetime)
+                "last_login": last_login,
+            })
+    except (ValueError, TypeError, KeyError) as e:
+        print(f"Can't get list of users from MISP: Unexpected content received: {type(e).__name__}: {e})")
+        raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected content received")
+
+    return users
+
+
+def misp_add_user(user: 'UserAccount') -> None:
+    """Add a new user to MISP
+
+    :raise MISPUnexpectedReplyError, MISPUserExistsError
+    """
+    user_email = user.email
+    user_role_id = 1 # should be "admin", no support for other roles yet
+    user_org_id = 1 # use the first org, no support for selection yet
+
+    url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/add"
+    data = {
+        "email": user_email,
+        "org_id": user_org_id,
+        "role_id": user_role_id,
+        "external_auth_required": "1",
+        "external_auth_key": user_email,
+        "change_pw": "0",
+    }
+    resp = _send_request("post", url, data)
+    if not resp.ok:
+        if "An account with this email address already exists" in resp.text:
+            raise MISPUserExistsError()
+        else:
+            print(f"Can't add user to MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}")
+            raise MISPUnexpectedReplyError(f"Can't add user to MISP: Unexpected reply {resp.status_code}")
+    return None
+
+
+def misp_edit_user(old_email, new_email) -> None:
+    """Edit existing user in MISP (only email can be changed, other params aren't stored in MISP)
+
+    :raise MISPUnexpectedReplyError, MISPUserExistsError
+    """
+    user_id = _get_id_by_email(old_email) # raises MISPUserNotFoundError if user not found
+
+    url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/edit/" + user_id
+    data = {
+        "email": new_email,
+        "external_auth_required": "1",
+        "external_auth_key": new_email,
+        "change_pw": "0",
+    }
+    resp = _send_request("post", url, data)
+    if not resp.ok:
+        if "An account with this email address already exists" in resp.text:
+            raise MISPUserExistsError()
+        else:
+            print(f"Can't edit user in MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}")
+            raise MISPUnexpectedReplyError(f"Can't edit user in MISP: Unexpected reply {resp.status_code}")
+    return None
+
+
+def misp_delete_user(user_email: str) -> None:
+    """Delete a user from MISP
+
+    :raises MISPUnexpectedReplyError, MISPUserNotFoundError
+    """
+    user_id = _get_id_by_email(user_email) # raises MISPUserNotFoundError if user not found
+    url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/delete/" + user_id
+    resp = _send_request("post", url)
+    if not resp.ok:
+        print(f"Can't delete user from MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}")
+        raise MISPUnexpectedReplyError(f"Can't delete user from MISP: Unexpected reply {resp.status_code}")
+    return None
+
+
+# =========================
+# Auxiliary functions
+
+def _send_request(method:str, url:str, data:Optional[dict]=None):
+    return getattr(requests, method)(
+        url,
+        headers={
+            "Authorization": config.MISP_API_KEY,
+            "Accept": "application/json",
+        },
+        verify=config.CA_CERT_FILE,
+        json=data
+    )
+
+
+def _get_id_by_email(email:str) -> str:
+    url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users"
+    resp = _send_request("get", url)
+    if not resp.ok:
+        raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected reply {resp.status_code}")
+    try:
+        for user_entry in resp.json():
+            if user_entry["User"]["email"] == email:
+                return user_entry["User"]["id"]
+        raise MISPUserNotFoundError()
+    except (ValueError, TypeError, KeyError) as e:
+        raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected content received")
-- 
GitLab