Skip to content
Snippets Groups Projects
Commit c993bf3c authored by Václav Bartoš's avatar Václav Bartoš
Browse files

added forgotten misp.py

parent f66d5abb
No related branches found
No related tags found
No related merge requests found
misp.py 0 → 100644
"""Functions to manage user accounts in MISP"""
from typing import List, Dict, Optional
import requests
from datetime import datetime
import re
from operator import itemgetter
import urllib.parse
import config
# Base URL to MISP API endpoints
MISP_API_BASE_URL = "https://{soctools_proxy}:6443"
# MISP API documentation: https://www.circl.lu/doc/misp/automation/#user-management
class MISPError(Exception):
pass
class MISPUserNotFoundError(MISPError):
pass
class MISPUserExistsError(MISPError):
pass
class MISPUnexpectedReplyError(MISPError):
pass
# =========================
# Public interface
def misp_get_users() -> List[Dict]:
"""
List users defined in MISP
:return List of dicts with keys 'id', 'email', 'org', 'role', 'login', 'created' (datetime), 'last_login' (datetime or None)
:raise MISPUnexpectedReplyError
"""
url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users"
resp = _send_request("get", url)
if not resp.ok:
raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected reply {resp.status_code}")
users = []
try:
for user_entry in resp.json():
created = user_entry["User"]["date_created"] # string unix timestamp (or null for pre-created admin account)
created = datetime.utcfromtimestamp(int(created)) if created else None
last_login = user_entry["User"]["last_login"] # string unix timestamp ("0" means no login yet)
last_login = datetime.utcfromtimestamp(int(last_login)) if last_login and last_login != "0" else None
users.append({
"id": user_entry["User"]["id"],
"email": user_entry["User"]["email"],
"org": user_entry["Organisation"]["name"],
"role": user_entry["Role"]["name"],
"created": created, # time of account creation (datetime)
"last_login": last_login,
})
except (ValueError, TypeError, KeyError) as e:
print(f"Can't get list of users from MISP: Unexpected content received: {type(e).__name__}: {e})")
raise MISPUnexpectedReplyError(f"Can't get list of users from MISP: Unexpected content received")
return users
def misp_add_user(user: 'UserAccount') -> None:
"""Add a new user to MISP
:raise MISPUnexpectedReplyError, MISPUserExistsError
"""
user_email = user.email
user_role_id = 1 # should be "admin", no support for other roles yet
user_org_id = 1 # use the first org, no support for selection yet
url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/add"
data = {
"email": user_email,
"org_id": user_org_id,
"role_id": user_role_id,
"external_auth_required": "1",
"external_auth_key": user_email,
"change_pw": "0",
}
resp = _send_request("post", url, data)
if not resp.ok:
if "An account with this email address already exists" in resp.text:
raise MISPUserExistsError()
else:
print(f"Can't add user to MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}")
raise MISPUnexpectedReplyError(f"Can't add user to MISP: Unexpected reply {resp.status_code}")
return None
def misp_edit_user(old_email, new_email) -> None:
"""Edit existing user in MISP (only email can be changed, other params aren't stored in MISP)
:raise MISPUnexpectedReplyError, MISPUserExistsError
"""
user_id = _get_id_by_email(old_email) # raises MISPUserNotFoundError if user not found
url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/edit/" + user_id
data = {
"email": new_email,
"external_auth_required": "1",
"external_auth_key": new_email,
"change_pw": "0",
}
resp = _send_request("post", url, data)
if not resp.ok:
if "An account with this email address already exists" in resp.text:
raise MISPUserExistsError()
else:
print(f"Can't edit user in MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}")
raise MISPUnexpectedReplyError(f"Can't edit user in MISP: Unexpected reply {resp.status_code}")
return None
def misp_delete_user(user_email: str) -> None:
"""Delete a user from MISP
:raises MISPUnexpectedReplyError, MISPUserNotFoundError
"""
user_id = _get_id_by_email(user_email) # raises MISPUserNotFoundError if user not found
url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users/delete/" + user_id
resp = _send_request("post", url)
if not resp.ok:
print(f"Can't delete user from MISP: Unexpected reply {resp.status_code}: {resp.text[:500]}")
raise MISPUnexpectedReplyError(f"Can't delete user from MISP: Unexpected reply {resp.status_code}")
return None
# =========================
# Auxiliary functions
def _send_request(method:str, url:str, data:Optional[dict]=None):
return getattr(requests, method)(
url,
headers={
"Authorization": config.MISP_API_KEY,
"Accept": "application/json",
},
verify=config.CA_CERT_FILE,
json=data
)
def _get_id_by_email(email:str) -> str:
url = MISP_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/admin/users"
resp = _send_request("get", url)
if not resp.ok:
raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected reply {resp.status_code}")
try:
for user_entry in resp.json():
if user_entry["User"]["email"] == email:
return user_entry["User"]["id"]
raise MISPUserNotFoundError()
except (ValueError, TypeError, KeyError) as e:
raise MISPUnexpectedReplyError(f"Can't find id of user with email '{email}': Unexpected content received")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment