Skip to content
Snippets Groups Projects
Commit 26a8fced authored by Václav Bartoš's avatar Václav Bartoš
Browse files

NiFi user management rewritten to use NiFi API

API is authenticated via token requested from Keycloak by simulating normal user login.
A special user account "soctools-user-mgmt" is used to manage user in NiFi (this must be pre-created duing soctools installation).
Many other related changes.
parent aec9ff38
No related branches found
No related tags found
No related merge requests found
# Various constants and parameters # Global configuration parameters
# Some are filled during initialization by load_config() in main.py
NIFI_CONTAINERS = [ import os.path
'soctools-nifi-1',
'soctools-nifi-2', # *** Configuration of file paths ***
'soctools-nifi-3', SOCTOOLS_BASE = ".." # path to the root of soctools files
] VARIABLES_FILE = os.path.join(SOCTOOLS_BASE, "group_vars/all/variables.yml")
CA_CERT_FILE = os.path.join(SOCTOOLS_BASE, "secrets/CA/ca.crt")
KEYCLOAK_ADMIN_PASSWORD_FILE = os.path.join(SOCTOOLS_BASE, "secrets/passwords/keykloak_admin") # Note: should be keycloak, not keykloak
# Credentials of the special user for account management
# Cert and key should be in .pem format, unencrypted
MGMT_USER_NAME = "soctools-user-mgmt"
MGMT_USER_CERT_PATH = os.path.join(SOCTOOLS_BASE, "secrets/CA/issued/soctools-user-mgmt.crt")
MGMT_USER_KEY_PATH = os.path.join(SOCTOOLS_BASE, "secrets/CA/private/soctools-user-mgmt.key")
# TODO FIXME "SOC_Admin" used instead for initial testing
# MGMT_USER_NAME = "SOC_Admin"
# MGMT_USER_CERT_PATH = os.path.join(SOCTOOLS_BASE, "secrets/certificates/SOC_Admin.crt.pem")
# MGMT_USER_KEY_PATH = os.path.join(SOCTOOLS_BASE, "secrets/certificates/SOC_Admin.key.pem")
# Following parameters are set up dynamically by load_config() in main.py
SOCTOOLSPROXY = None
KEYCLOAK_BASE_URL = None
KEYCLOAK_USERS_URL = None
KEYCLOAK_ADMIN_PASSWORD = None
...@@ -15,32 +15,26 @@ import yaml ...@@ -15,32 +15,26 @@ import yaml
from dataclasses import dataclass, field # external package - backport of Py3.7 dataclasses to Py3.6 from dataclasses import dataclass, field # external package - backport of Py3.7 dataclasses to Py3.6
from nifi import * from nifi import *
import config
app = Flask(__name__) app = Flask(__name__)
app.secret_key = "ASDF1234 - CHANGE ME!" app.secret_key = "ASDF1234 - CHANGE ME!"
# *** Configuration of file paths ***
SOCTOOLS_BASE = ".." # path to the root of soctools files
VARIABLES_FILE = os.path.join(SOCTOOLS_BASE, "group_vars/all/variables.yml")
CA_CERT_FILE = os.path.join(SOCTOOLS_BASE, "secrets/CA/ca.crt")
KEYCLOAK_ADMIN_PASSWORD_FILE = os.path.join(SOCTOOLS_BASE, "secrets/passwords/keykloak_admin") # Note: should be keycloak, not keykloak
@app.before_first_request @app.before_first_request
def load_config(): def load_config():
"""Load various variables, api keys, etc. and set configuration parameters""" """Load various variables, api keys, etc. and set configuration parameters"""
global SOCTOOLSPROXY, KEYCLOAK_BASE_URL, KEYCLOAK_USERS_URL, KEYCLOAK_ADMIN_PASSWORD variables = yaml.safe_load(open(config.VARIABLES_FILE, "r"))
variables = yaml.safe_load(open(VARIABLES_FILE, "r"))
# Get FQDN of the main server # Get FQDN of the main server
SOCTOOLSPROXY = variables["soctoolsproxy"] config.SOCTOOLSPROXY = variables["soctoolsproxy"]
assert re.match('[a-zA-Z0-9.-]+', SOCTOOLSPROXY), f"ERROR: The 'soctoolsproxy' variable loaded from '{VARIABLES_FILE}' is not a valid domain name." assert re.match('[a-zA-Z0-9.-]+', config.SOCTOOLSPROXY), f"ERROR: The 'soctoolsproxy' variable loaded from '{config.VARIABLES_FILE}' is not a valid domain name."
# Set base URL to Keycloak # Set base URL to Keycloak
KEYCLOAK_BASE_URL = f"https://{SOCTOOLSPROXY}:12443" config.KEYCLOAK_BASE_URL = f"https://{config.SOCTOOLSPROXY}:12443"
KEYCLOAK_USERS_URL = KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users" config.KEYCLOAK_USERS_URL = config.KEYCLOAK_BASE_URL + "/auth/admin/realms/SOCTOOLS1/users"
# Load API key for Keycloak # Load API key for Keycloak
KEYCLOAK_ADMIN_PASSWORD = open(KEYCLOAK_ADMIN_PASSWORD_FILE, "r").read(100).strip() # read max 100 B, the key should never be so long config.KEYCLOAK_ADMIN_PASSWORD = open(config.KEYCLOAK_ADMIN_PASSWORD_FILE, "r").read(100).strip() # read max 100 B, the key should never be so long
print(f"Config loaded:\nSOCTOOLSPROXY={SOCTOOLSPROXY}\nKEYCLOAK_BASE_URL={KEYCLOAK_BASE_URL}\n" print(f"Config loaded:\nSOCTOOLSPROXY={config.SOCTOOLSPROXY}\nKEYCLOAK_BASE_URL={config.KEYCLOAK_BASE_URL}\n"
f"KEYCLOAK_ADMIN_PASSWORD={KEYCLOAK_ADMIN_PASSWORD[:3]}...{KEYCLOAK_ADMIN_PASSWORD[-4:]}") f"KEYCLOAK_ADMIN_PASSWORD={config.KEYCLOAK_ADMIN_PASSWORD[:3]}...{config.KEYCLOAK_ADMIN_PASSWORD[-4:]}")
# *** Custom Jinja filters *** # *** Custom Jinja filters ***
...@@ -63,6 +57,7 @@ class UserAccount: ...@@ -63,6 +57,7 @@ class UserAccount:
kcid: Optional[str] = field(default=None) # keycloak ID kcid: Optional[str] = field(default=None) # keycloak ID
ts_created: Optional[datetime] = field(default=None) # timezone-aware datetime in UTC ts_created: Optional[datetime] = field(default=None) # timezone-aware datetime in UTC
components: Optional[Dict[str, bool]] = field(default_factory=dict) # Presence of the account in SOCtools components that don't use Keycloak directly (NiFi, MISP, TheHive, ...) components: Optional[Dict[str, bool]] = field(default_factory=dict) # Presence of the account in SOCtools components that don't use Keycloak directly (NiFi, MISP, TheHive, ...)
internal: bool = False
def to_keycloak_representation(self) -> Dict: def to_keycloak_representation(self) -> Dict:
""" """
...@@ -85,9 +80,9 @@ class UserAccount: ...@@ -85,9 +80,9 @@ class UserAccount:
@classmethod @classmethod
def from_keycloak_representation(cls, kc_user: dict) -> "UserAccount": def from_keycloak_representation(cls, kc_user: dict) -> "UserAccount":
try: try:
return cls(kc_user['username'], kc_user['email'], kc_user['firstName'], kc_user['lastName'], return cls(kc_user['username'], kc_user.get('email', ''), kc_user.get('firstName', ''),
kc_user['attributes'].get('CN',[''])[0], kc_user['attributes'].get('DN',[''])[0], kc_user.get('lastName', ''), kc_user.get('attributes', {}).get('CN',[''])[0],
kc_user['id'], kc_user.get('attributes', {}).get('DN',[''])[0], kc_user['id'],
datetime.utcfromtimestamp(int(kc_user['createdTimestamp']/1000)).replace(tzinfo=timezone.utc)) datetime.utcfromtimestamp(int(kc_user['createdTimestamp']/1000)).replace(tzinfo=timezone.utc))
except KeyError as e: except KeyError as e:
raise KeycloakError(f"User representation received from Keycloak is missing attribute '{e}'") raise KeycloakError(f"User representation received from Keycloak is missing attribute '{e}'")
...@@ -104,15 +99,15 @@ def kc_get_token() -> str: ...@@ -104,15 +99,15 @@ def kc_get_token() -> str:
Return the token or raise KeycloakError Return the token or raise KeycloakError
""" """
url = KEYCLOAK_BASE_URL + "/auth/realms/master/protocol/openid-connect/token" url = config.KEYCLOAK_BASE_URL + "/auth/realms/master/protocol/openid-connect/token"
data = { data = {
"client_id": "admin-cli", "client_id": "admin-cli",
"username": "admin", "username": "admin",
"password": KEYCLOAK_ADMIN_PASSWORD, "password": config.KEYCLOAK_ADMIN_PASSWORD,
"grant_type": "password" "grant_type": "password"
} }
try: try:
resp = requests.post(url, data, verify=CA_CERT_FILE) resp = requests.post(url, data, verify=config.CA_CERT_FILE)
if resp.status_code != 200: if resp.status_code != 200:
raise KeycloakError(f"Can't get OIDC token for API access: ({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"Can't get OIDC token for API access: ({resp.status_code}) {resp.text[:200]}")
return str(resp.json()['access_token']) return str(resp.json()['access_token'])
...@@ -127,7 +122,7 @@ def kc_get_users() -> List[UserAccount]: ...@@ -127,7 +122,7 @@ def kc_get_users() -> List[UserAccount]:
:raise KeycloakError :raise KeycloakError
""" """
token = kc_get_token() token = kc_get_token()
resp = requests.get(KEYCLOAK_USERS_URL, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) resp = requests.get(config.KEYCLOAK_USERS_URL, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
if not resp.ok: if not resp.ok:
raise KeycloakError(f"Can't get list of users: ({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"Can't get list of users: ({resp.status_code}) {resp.text[:200]}")
try: try:
...@@ -147,8 +142,8 @@ def kc_get_user_by_id(userid: str) -> UserAccount: ...@@ -147,8 +142,8 @@ def kc_get_user_by_id(userid: str) -> UserAccount:
""" """
assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID" assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID"
token = kc_get_token() token = kc_get_token()
url = KEYCLOAK_USERS_URL + "/" + userid url = config.KEYCLOAK_USERS_URL + "/" + userid
resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
if not resp.ok: if not resp.ok:
raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}")
try: try:
...@@ -167,8 +162,8 @@ def kc_get_user_by_name(username: str) -> Optional[UserAccount]: ...@@ -167,8 +162,8 @@ def kc_get_user_by_name(username: str) -> Optional[UserAccount]:
:raise KeycloakError :raise KeycloakError
""" """
token = kc_get_token() token = kc_get_token()
url = KEYCLOAK_USERS_URL url = config.KEYCLOAK_USERS_URL
resp = requests.get(url, params={'username': username, 'exact': 'true'}, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) resp = requests.get(url, params={'username': username, 'exact': 'true'}, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
if not resp.ok: if not resp.ok:
raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"Can't get user info: ({resp.status_code}) {resp.text[:200]}")
print(resp.text) print(resp.text)
...@@ -192,8 +187,8 @@ def kc_add_user(user: UserAccount) -> None: ...@@ -192,8 +187,8 @@ def kc_add_user(user: UserAccount) -> None:
user_data = user.to_keycloak_representation() user_data = user.to_keycloak_representation()
user_data["enabled"] = True # add "enable" key, since a new user must be explicitly enabled (default is False) user_data["enabled"] = True # add "enable" key, since a new user must be explicitly enabled (default is False)
resp = requests.post(KEYCLOAK_USERS_URL, json=user_data, resp = requests.post(config.KEYCLOAK_USERS_URL, json=user_data,
headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
if not resp.ok: if not resp.ok:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
...@@ -208,9 +203,9 @@ def kc_update_user(user: UserAccount) -> None: ...@@ -208,9 +203,9 @@ def kc_update_user(user: UserAccount) -> None:
token = kc_get_token() token = kc_get_token()
user_data = user.to_keycloak_representation() user_data = user.to_keycloak_representation()
url = KEYCLOAK_USERS_URL + "/" + user.kcid url = config.KEYCLOAK_USERS_URL + "/" + user.kcid
resp = requests.put(url, json=user_data, resp = requests.put(url, json=user_data,
headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
if not resp.ok: if not resp.ok:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
...@@ -223,8 +218,8 @@ def kc_delete_user(userid: str) -> None: ...@@ -223,8 +218,8 @@ def kc_delete_user(userid: str) -> None:
""" """
assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID" assert re.match(r'[0-9a-z-]*', userid), "Invalid user ID"
token = kc_get_token() token = kc_get_token()
url = KEYCLOAK_USERS_URL + "/" + userid url = config.KEYCLOAK_USERS_URL + "/" + userid
resp = requests.delete(url, headers={'Authorization': 'Bearer ' + token}, verify=CA_CERT_FILE) resp = requests.delete(url, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
if not resp.ok: if not resp.ok:
raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}") raise KeycloakError(f"({resp.status_code}) {resp.text[:200]}")
...@@ -251,6 +246,10 @@ def main(): ...@@ -251,6 +246,10 @@ def main():
except KeycloakError as e: except KeycloakError as e:
flash(f"ERROR: {e}", "error") flash(f"ERROR: {e}", "error")
users = [] users = []
# Mark "internal" users
for u in users:
if u.username == config.MGMT_USER_NAME:
u.internal = True
#print(users) #print(users)
# Load NiFi users # Load NiFi users
...@@ -259,6 +258,11 @@ def main(): ...@@ -259,6 +258,11 @@ def main():
except NifiError as e: except NifiError as e:
flash(f"ERROR: {e}", "error") flash(f"ERROR: {e}", "error")
nifi_users = [] nifi_users = []
# Mark "internal" users
for u in nifi_users:
if u["name"].startswith("CN=soctools-nifi-") or u["name"] == config.MGMT_USER_NAME:
u["internal"] = True
# List of usernames only (for easier cross-check with Keycloak users)
nifi_usernames = set(nu["name"] for nu in nifi_users) nifi_usernames = set(nu["name"] for nu in nifi_users)
return render_template("main.html", **locals()) return render_template("main.html", **locals())
......
"""Functions to manage user accounts in NiFi""" """Functions to manage user accounts in NiFi"""
from typing import List, Dict, Optional from typing import List, Dict, Optional
import subprocess import requests
import xml.etree.ElementTree as ET import re
from operator import itemgetter
from config import * import config
config.SOCTOOLSPROXY = "gn4soctools3.liberouter.org"
# Path to user configuration in NiFi containers # URL to initial login process
NIFI_USER_CONFIG_PATH = "/opt/nifi/nifi-current/conf/users.xml" NIFI_LOGIN_URL = "https://{soctools_proxy}:9443/nifi/login"
# Base URL to NiFi API endpoints
# Shell command to restart NiFi in the container (simple "supervisorctl restart" doesn't work, since there is another NIFI_API_BASE_URL = "https://{soctools_proxy}:9443/nifi-api"
# nifi process which supervisord doesn't see and which stops only after some time after the main one; so we need to
# wait until it stops as well by calling "ps" in a loop)
#NIFI_RESTART_COMMAND = "supervisorctl stop nifi ; while (ps aux | grep '^nifi' >/dev/null); do sleep 1; done; supervisorctl start nifi"
NIFI_RESTART_COMMAND = "bin/nifi.sh stop" # stop properly by sending a stop command. It is then restarted automatically by supervisord.
# NiFi API documentation: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
class NifiError(Exception): class NifiError(Exception):
pass pass
...@@ -25,229 +24,113 @@ class NifiUserNotFoundError(NifiError): ...@@ -25,229 +24,113 @@ class NifiUserNotFoundError(NifiError):
class NifiUserExistsError(NifiError): class NifiUserExistsError(NifiError):
pass pass
# For reference, an example NiFi user-config file looks like this: class NifiUnexpectedReplyError(NifiError):
""" pass
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<tenants>
<groups>
<group identifier="c78caf19-016f-1000-0000-000000000001" name="NiFi nodes">
<user identifier="c78caf19-016f-1000-0001-000000000001"/>
<user identifier="c78caf19-016f-1000-0001-000000000002"/>
<user identifier="c78caf19-016f-1000-0001-000000000003"/>
</group>
<group identifier="c78caf19-016f-1000-0000-000000000002" name="Administrators">
<user identifier="c78caf19-016f-1000-0002-000000000001"/>
<user identifier="c78caf19-016f-1000-0002-000000000002"/>
</group>
</groups>
<users>
<user identifier="c78caf19-016f-1000-0001-000000000001" identity="CN=soctools-nifi-1"/>
<user identifier="c78caf19-016f-1000-0001-000000000002" identity="CN=soctools-nifi-2"/>
<user identifier="c78caf19-016f-1000-0001-000000000003" identity="CN=soctools-nifi-3"/>
<user identifier="c78caf19-016f-1000-0002-000000000001" identity="user1"/>
<user identifier="c78caf19-016f-1000-0002-000000000002" identity="user2"/>
</users>
</tenants>
"""
def _nifi_xml_get_users(config_xml: str) -> List[Dict]:
"""Parse the XML file and return list of users.
Return: list of dicts with keys 'name', 'id', 'group'
"""
config_root = ET.fromstring(config_xml)
# Load groups and store group name for each user ID
user_to_group = {} # dict: user_id -> group_name
for group_elem in config_root.find("groups").findall("group"):
g_name = group_elem.get("name")
for user_elem in group_elem.findall("user"):
user_to_group[user_elem.get("identifier")] = g_name
# Load users, find the group for each one
users = [] # list of dicts with keys 'name' (identity), 'id' (identifier), 'group'
for user_elem in config_root.findall("./users/user"):
users.append({
"name": user_elem.get("identity"),
"id": user_elem.get("identifier"),
"group": user_to_group[user_elem.get("identifier")],
})
return users
def _nifi_xml_add_user(config_xml: str, user_name: str, user_group: str) -> str: def _nifi_get_jwt() -> str:
"""Add given user to the XML config file. """
Get OIDC token (JWT) for authenticating API requests
Assumes that "user_group" already exists in the file. Simulate standard login process like a user would do via browser.
It would be better to use "Resource Owner Password Credentials Grant" flow (or "Direct Access Grant" as Keycloak
calls it), which is more suitable for machine clients. However, when such flow is used, Keycloak generates
a differently formatted JWT token (don't know why), which NiFi doesn't accept.
I tried everything to make it work, but unsuccessfully. Therefore, the standard flow is used, which is more
complicated, but works whenever the normal user login works.
:return updated xml string :return JWT token
:raise NifiUnexpectedReplyError
""" """
config_root = ET.fromstring(config_xml) # We will need to store some cookies - create a session which will automatically handle it
# Get info about the given group # Also, set path to certificates
group_node = config_root.find(f"./groups/group[@name='{user_group}']") session = requests.Session()
group_user_nodes = group_node.findall("user") session.verify = config.CA_CERT_FILE
# Get list of users session.cert = (config.MGMT_USER_CERT_PATH, config.MGMT_USER_KEY_PATH)
users_node = config_root.find("./users")
# check that there is not a user with the same username # Initiate login process by querying the NiFi login page.
if any(u.get("name") == user_name for u in users_node): # NiFi should set the 'nifi-oidc-request-identifier' cookie (stored into session, will be needed later) and
raise NifiUserExistsError(f"Username '{user_name}' already exists!") # redirect us to Keycloak. The redirection is automatically followed by requests.get().
# Keycloak should authenticate us using the provided certificate and present a web page with confirmation form.
# Generate new user identifier as the max id of the group plus one url = NIFI_LOGIN_URL.format(soctools_proxy=config.SOCTOOLSPROXY)
# ids look like: abcd1234-01ab-1000-0002-000000000001 (the last part seems to be the index of the user within the group) resp = session.get(url, allow_redirects=True)
user_ids = [u.get("identifier") for u in group_user_nodes] if not resp.ok:
max_id = max(user_ids) raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
group_id,last_user_id = max_id.rsplit("-", 1) # Parse the returned web page, find the form with id="kc-x509-login-info" and takes URL from its "action" attribute.
new_id = f"{group_id}-{int(last_user_id)+1:012d}" re_get_from_url = r'<form [^>]*id="kc-x509-login-info"[^>]* action="([^"]*)"'
match = re.search(re_get_from_url, resp.text)
# Add a new element the list of users in the group and the list of users if not match:
group_node.append(ET.Element("user", identifier=new_id)) # try to get error message
users_node.append(ET.Element("user", identifier=new_id, identity=user_name)) match2 = re.search(r'<span class="kc-feedback-text">(.*?)</span>', resp.text)
if match2:
return ET.tostring(config_root, encoding='utf-8') raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP reply content (can't find the x509 login form). It contains the following, probably an error message: {match2.group(1)}.")
else:
def _nifi_xml_delete_user(config_xml: str, user_name: str) -> str: raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP reply content (can't find the x509 login from). Queried URL: {resp.url}")
"""Remove given user from the XML config file. url = match.group(1)
url = url.replace("&amp;", "&")
Assumes that "user_group" already exists in the file. # Send POST request to the URL to simulate clicking the "Continue" button
# Keycloak should redirect us to NiFi's callback URL. Requests automatically follow this redirection, so we should
:return updated xml string # receive 200 OK from NiFi, whose content is the JWT
""" data = "login=Continue"
config_root = ET.fromstring(config_xml) headers = {"Content-Type": "application/x-www-form-urlencoded"}
# Find user with given name resp = session.post(url, data=data, headers=headers, allow_redirects=True)
user_node = config_root.find(f"./users/user[@identity='{user_name}']") if not resp.ok:
if user_node is None: raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
raise NifiUserNotFoundError(f"Can't delete user '{user_name}' from NiFi: User with such a name doesn't exist.")
# Get user's numerical id # Now, we are authenticated to NiFi, identified by a cookie (stored within our session object).
identifier = user_node.get('identifier') # Use the cookie and ask for the JWT token we need for API requests
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/access/oidc/exchange"
# Remove the user from <users> and from any groups print(f"_nifi_get_jwt: POST request to: {url}")
print("identifier:", identifier) resp = session.post(url) # POST must be used even though no data are being sent
print("user_node:", user_node) if not resp.ok:
config_root.find("./users").remove(user_node) raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
for group_node in config_root.findall("./groups/"): return resp.text
print("group_node:", group_node)
user_node = group_node.find(f"user[@identifier='{identifier}']")
print(" user_node:", user_node)
if user_node is not None:
group_node.remove(user_node)
return ET.tostring(config_root, encoding='utf-8')
def _nifi_load_user_config(cont_name: str) -> str:
"""Get the current user-config file from a NiFi docker container"""
#print("Getting NiFi config...")
result = subprocess.run(["docker", "exec", cont_name, "cat", NIFI_USER_CONFIG_PATH],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
config_xml = result.stdout.decode('utf-8')
#print(config_xml)
return config_xml
else:
raise NifiError(f'Error when trying to get the current config of NiFi users from container "{cont_name}": {result.stderr.decode()}')
def _nifi_write_user_config(new_xml: str):
"""Write given XML string to the user-config file in all NiFi containers, restart NiFi in all containers."""
# Write new file into all containers
for i,cont_name in enumerate(NIFI_CONTAINERS):
# Run a command to write the new user-config file inside the NiFi docker container
# The file contents are passed via stdin ("docker exec -i" must be used for stdin to work), "tee" is used to
# store it to a file (stdout is ignored)
print(f'Writing new NiFi user config in container "{cont_name}"')
result = subprocess.run(["docker", "exec", "-i", cont_name, "tee", NIFI_USER_CONFIG_PATH],
input=new_xml, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
raise NifiError(f'Error when trying to write the updated list of NiFi users into container "{cont_name}": {result.stderr.decode()}')
# Restart all containers (may take a long time!)
for i,cont_name in enumerate(NIFI_CONTAINERS):
print(f'Restarting NiFi in container "{cont_name}"')
result = subprocess.run(["docker", "exec", cont_name, "bash", "-c", NIFI_RESTART_COMMAND],
stderr=subprocess.PIPE)
if result.returncode != 0:
raise NifiError(f'Error when trying to restart NiFi after config update in container "{cont_name}": {result.stderr.decode()}')
def nifi_get_users() -> List[Dict]: def nifi_get_users() -> List[Dict]:
""" """
List users defined in NiFi List users defined in NiFi
:return List of dicts with keys 'id', 'name', 'groups' (list of group names)
:raise NifiUnexpectedReplyError
""" """
prev_users = None url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/users"
for i,cont_name in enumerate(NIFI_CONTAINERS): token = _nifi_get_jwt()
# Get the current user-config file resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
config_xml = _nifi_load_user_config(cont_name) if not resp.ok:
if config_xml is False: raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected reply {resp.status_code}")
return [] users = []
try:
# Parse the list of users from the config file raw_users = resp.json()['users']
try: #print(raw_users)
users = _nifi_xml_get_users(config_xml) for user in raw_users:
except Exception as e: users.append({
raise NifiError(f'Can\'t parse NiFi user config file ("{NIFI_USER_CONFIG_PATH}" in container "{cont_name}"): {e}') 'id': user["component"]["id"],
'name': user["component"]["identity"],
# Remove "internal" users from the "NiFi nodes" group 'groups': [g["component"]["identity"] for g in user["component"]["userGroups"]],
users = [u for u in users if u["group"] != "NiFi nodes"] })
# TODO this way, the comparison forces the order of users to be the same, which is not needed (although users.sort(key=itemgetter('name'))
# it is always the same in normal situation) #print(users)
# Check that the list is the same as in the previous container (all should be the same) except (ValueError, TypeError, KeyError):
if prev_users is not None and users != prev_users: raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected content received")
raise NifiError('Error when trying to get the list of NiFi users: The lists of users differ in at least ' return users
f'two NiFi nodes ({NIFI_CONTAINERS[i-1]},{NIFI_CONTAINERS[i]}). Check the file '
f'"{NIFI_USER_CONFIG_PATH}" in each NiFi container, they all must be the same.')
prev_users = users
return prev_users
def nifi_add_user(user: 'UserAccount'): def nifi_add_user(user: 'UserAccount'):
"""Add a new user to NiFi """Add a new user to NiFi
Read user config from a NiFi container (the first one), add a new user it, and write it in all containers (overwrite!).
:raises NifiError :raises NifiError
""" """
user_name = user.username user_name = user.username
user_group = "Administrators" # no support for other groups in NiFi, yet user_group = "Administrators" # no support for other groups in NiFi, yet
# Get the current user-config file (use the first container, the content should be the same in all) # TODO
cont_name = NIFI_CONTAINERS[0]
config_xml = _nifi_load_user_config(cont_name)
#Add new user to the XML
try:
new_xml = _nifi_xml_add_user(config_xml, user_name, user_group)
except NifiUserExistsError:
raise
except Exception as e:
raise NifiError(f'Can\'t add user to the config file ("{NIFI_USER_CONFIG_PATH}" in container "{cont_name}"): {e}')
#print("XML with added user:")
#print(new_xml)
# Write the updated config and restart NiFi nodes
_nifi_write_user_config(new_xml)
def nifi_delete_user(user_name: str): def nifi_delete_user(user_name: str):
"""Add a new user to NiFi """Delete a user from NiFi
Read user config from a NiFi container (the first one), remove the specified user, and write new version in all containers (overwrite!).
:raises NifiError :raises NifiError
""" """
# Get the current user-config file (use the first container, the content should be the same in all)
cont_name = NIFI_CONTAINERS[0]
config_xml = _nifi_load_user_config(cont_name)
# Remove user from the XML
try:
new_xml = _nifi_xml_delete_user(config_xml, user_name)
except NifiUserNotFoundError:
raise
except Exception as e:
raise NifiError(f'Can\'t remove user from config file ("{NIFI_USER_CONFIG_PATH}" in container "{cont_name}"): {e}')
#print("XML with deleted user:")
#print(new_xml)
# Write the updated config and restart NiFi nodes
_nifi_write_user_config(new_xml)
# TODO
...@@ -42,4 +42,8 @@ li.flash-success { ...@@ -42,4 +42,8 @@ li.flash-success {
input[readonly] { input[readonly] {
background-color: #ddd; background-color: #ddd;
} }
\ No newline at end of file
.internal-user {
color: #ccc;
}
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
<tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th>NiFi</th><th></th> <tr><th>Username</th><th>First name</th><th>Last name</th><th>email</th><th>CN</th><th>DN</th><th>Time created (UTC)</th><th>NiFi</th><th></th>
{% for user in users %} {% for user in users %}
<tr> <tr{% if user.internal %} class="internal-user"{% endif %}>
<td>{{ user.username }}</td> <td>{{ user.username }}</td>
<td>{{ user.firstname }}</td> <td>{{ user.firstname }}</td>
<td>{{ user.lastname }}</td> <td>{{ user.lastname }}</td>
...@@ -17,9 +17,11 @@ ...@@ -17,9 +17,11 @@
<td>{{ user.ts_created.isoformat() }}</td> <td>{{ user.ts_created.isoformat() }}</td>
<td>{{ icon('check' if user.username in nifi_usernames else 'close') }}</td> <td>{{ icon('check' if user.username in nifi_usernames else 'close') }}</td>
<td> <td>
{% if not user.internal -%}
<a href="{{ url_for('edit_user', username=user.username) }}" title="Edit user">{{ icon('pencil') }}</a> <a href="{{ url_for('edit_user', username=user.username) }}" title="Edit user">{{ icon('pencil') }}</a>
<a href="{{ url_for('delete_user', username=user.username) }}" title="Delete user" <a href="{{ url_for('delete_user', username=user.username) }}" title="Delete user"
onclick="return confirm('Are you sure you want to permanently delete user account &quot;{{user.username}}&quot; ({{user.cn}}, {{user.email}})?')">{{ icon('trash') }}</a> onclick="return confirm('Are you sure you want to permanently delete user account &quot;{{user.username}}&quot; ({{user.cn}}, {{user.email}})?')">{{ icon('trash') }}</a>
{%- endif %}
</td> </td>
</tr> </tr>
{#<tr><td colspan=8>{{ user }}</td></tr>#} {#<tr><td colspan=8>{{ user }}</td></tr>#}
...@@ -33,9 +35,9 @@ ...@@ -33,9 +35,9 @@
<tr><th>Username</th><th>Group</th><th>ID</th> <tr><th>Username</th><th>Group</th><th>ID</th>
{% for user in nifi_users %} {% for user in nifi_users %}
<tr> <tr{% if user.internal %} class="internal-user"{% endif %}>
<td>{{ user.name }}</td> <td>{{ user.name }}</td>
<td>{{ user.group }}</td> <td>{{ user.groups|join(',') }}</td>
<td>{{ user.id }}</td> <td>{{ user.id }}</td>
</tr> </tr>
{% endfor %} {% endfor %}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment