Skip to content
Snippets Groups Projects
Commit c9c3218c authored by Václav Bartoš's avatar Václav Bartoš
Browse files

nifi: functions to add and delete users implemented

NiFi connector is now complete
parent 26a8fced
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@ from typing import List, Dict, Optional
import requests
import re
from operator import itemgetter
import urllib.parse
import config
config.SOCTOOLSPROXY = "gn4soctools3.liberouter.org"
......@@ -27,6 +28,71 @@ class NifiUserExistsError(NifiError):
class NifiUnexpectedReplyError(NifiError):
pass
# =========================
# Public interface
def nifi_get_users() -> List[Dict]:
"""
List users defined in NiFi
:return List of dicts with keys 'id', 'name', 'groups' (list of group names)
:raise NifiUnexpectedReplyError
"""
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/users"
token = _nifi_get_jwt()
resp = _send_request('get', url, token)
if not resp.ok:
raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected reply {resp.status_code}")
users = []
try:
raw_users = resp.json()['users']
#print(raw_users)
for user in raw_users:
users.append({
'id': user["component"]["id"],
'name': user["component"]["identity"],
'groups': [g["component"]["identity"] for g in user["component"]["userGroups"]],
})
users.sort(key=itemgetter('name'))
#print(users)
except (ValueError, TypeError, KeyError):
raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected content received")
return users
def nifi_add_user(user: 'UserAccount'):
"""Add a new user to NiFi
:raise NifiUnexpectedReplyError
"""
user_name = user.username
user_group = "Administrators" # no support for other groups in NiFi, yet
token = _nifi_get_jwt()
new_user_spec = _add_user(user_name, token)
_add_user_to_group(new_user_spec["id"], user_group, token)
return None
def nifi_delete_user(user_name: str):
"""Delete a user from NiFi
:raises NifiError
"""
token = _nifi_get_jwt()
user_spec = _get_user_by_name(user_name, token)
if user_spec is None:
raise NifiUserNotFoundError()
_delete_user(user_spec, token)
# =========================
# Auxiliary functions
def _nifi_get_jwt() -> str:
"""
Get OIDC token (JWT) for authenticating API requests
......@@ -79,58 +145,113 @@ def _nifi_get_jwt() -> str:
# Now, we are authenticated to NiFi, identified by a cookie (stored within our session object).
# Use the cookie and ask for the JWT token we need for API requests
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/access/oidc/exchange"
print(f"_nifi_get_jwt: POST request to: {url}")
resp = session.post(url) # POST must be used even though no data are being sent
if not resp.ok:
raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
return resp.text
def _send_request(method:str, url:str, token:str, data:Optional[dict]=None):
return getattr(requests, method)(
url,
headers={"Authorization": "Bearer " + token},
verify=config.CA_CERT_FILE,
json=data
)
def nifi_get_users() -> List[Dict]:
"""
List users defined in NiFi
:return List of dicts with keys 'id', 'name', 'groups' (list of group names)
:raise NifiUnexpectedReplyError
"""
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/users"
token = _nifi_get_jwt()
resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
def _get_group_by_name(group_name: str, token=None) -> Optional[dict]:
"""Return complete user-group specification of a group with given name (or None if not found)"""
if not token:
token = _nifi_get_jwt()
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/user-groups"
resp = _send_request('get', url, token)
if not resp.ok:
raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected reply {resp.status_code}")
users = []
raise NifiUnexpectedReplyError(f"Can't get list of user groups from NiFi: Unexpected reply {resp.status_code}")
try:
raw_users = resp.json()['users']
#print(raw_users)
for user in raw_users:
users.append({
'id': user["component"]["id"],
'name': user["component"]["identity"],
'groups': [g["component"]["identity"] for g in user["component"]["userGroups"]],
})
users.sort(key=itemgetter('name'))
#print(users)
groups = resp.json()["userGroups"]
for g in groups:
if g["component"]["identity"] == group_name:
return g
else:
return None # user-group with given name not found
except (ValueError, TypeError, KeyError):
raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected content received")
return users
raise NifiUnexpectedReplyError(f"Can't get list of user groups from NiFi: Unexpected content received")
def _get_group_id_by_name(group_name: str, token=None) -> Optional[str]:
"""Return ID of user-group with given name (or None if not found)"""
return _get_group_by_name(group_name, token)["component"]["id"]
def nifi_add_user(user: 'UserAccount'):
"""Add a new user to NiFi
:raises NifiError
def _add_user_to_group(user_id: str, group_name: str, token=None):
"""
user_name = user.username
user_group = "Administrators" # no support for other groups in NiFi, yet
Add given user-id to given user-group (identified by its name, not id)
"""
if not token:
token = _nifi_get_jwt()
# TODO
# To update a group, we have to rewrite it by a new specification (users are specified just by their id) ...
# Get complete specification of the group
group = _get_group_by_name(group_name)
group_id = group["component"]["id"]
def nifi_delete_user(user_name: str):
"""Delete a user from NiFi
# Edit - add the user
group["component"]["users"].append({"id": user_id})
# Save new group
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/user-groups/" + group_id
resp = _send_request('put', url, token, data=group)
if not resp.ok:
raise NifiUnexpectedReplyError(f"Can't assign user to group in NiFi: Unexpected reply: {resp.status_code} {resp.text[:1000]}")
def _add_user(user_name: str, token=None):
if not token:
token = _nifi_get_jwt()
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/users"
post_data = {
"revision": {"version": 0},
"component": {"identity": user_name}
}
resp = _send_request('post', url, token, data=post_data)
if not resp.ok:
if "already exists" in resp.text:
raise NifiUserExistsError()
raise NifiUnexpectedReplyError(f"Can't add user to NiFi: Unexpected reply: {resp.status_code} {resp.text[:1000]}")
user_spec = resp.json()
return user_spec
def _get_user_by_name(user_name: str, token=None) -> Optional[dict]:
"""Get user specification of the user with given name (or None if no such user found)"""
if not token:
token = _nifi_get_jwt()
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/search-results?q=" + urllib.parse.quote(user_name)
resp = _send_request('get', url, token)
if not resp.ok:
raise NifiUnexpectedReplyError(f"Can't get user info from NiFi: Unexpected reply: {resp.status_code} {resp.text[:1000]}")
matched_users = resp.json()["users"]
for u in matched_users:
if u["component"]["identity"] == user_name:
return u
return None
def _delete_user(user_spec: dict, token=None):
"""Delete user (its full spec is passed)"""
if not token:
token = _nifi_get_jwt()
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/users/" + user_spec["id"]
url += "?version=" + str(int(user_spec["revision"]["version"]))
print("DELETE " + url)
resp = _send_request('delete', url, token)
if not resp.ok:
raise NifiUnexpectedReplyError(f"Can't delete user from NiFi: Unexpected reply: {resp.status_code} {resp.text[:1000]}")
user_spec = resp.json()
return user_spec
:raises NifiError
"""
# TODO
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment