-
Václav Bartoš authored
API is authenticated via token requested from Keycloak by simulating normal user login. A special user account "soctools-user-mgmt" is used to manage user in NiFi (this must be pre-created duing soctools installation). Many other related changes.
Václav Bartoš authoredAPI is authenticated via token requested from Keycloak by simulating normal user login. A special user account "soctools-user-mgmt" is used to manage user in NiFi (this must be pre-created duing soctools installation). Many other related changes.
nifi.py 5.71 KiB
"""Functions to manage user accounts in NiFi"""
from typing import List, Dict, Optional
import requests
import re
from operator import itemgetter
import config
config.SOCTOOLSPROXY = "gn4soctools3.liberouter.org"
# URL to initial login process
NIFI_LOGIN_URL = "https://{soctools_proxy}:9443/nifi/login"
# Base URL to NiFi API endpoints
NIFI_API_BASE_URL = "https://{soctools_proxy}:9443/nifi-api"
# NiFi API documentation: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
class NifiError(Exception):
pass
class NifiUserNotFoundError(NifiError):
pass
class NifiUserExistsError(NifiError):
pass
class NifiUnexpectedReplyError(NifiError):
pass
def _nifi_get_jwt() -> str:
"""
Get OIDC token (JWT) for authenticating API requests
Simulate standard login process like a user would do via browser.
It would be better to use "Resource Owner Password Credentials Grant" flow (or "Direct Access Grant" as Keycloak
calls it), which is more suitable for machine clients. However, when such flow is used, Keycloak generates
a differently formatted JWT token (don't know why), which NiFi doesn't accept.
I tried everything to make it work, but unsuccessfully. Therefore, the standard flow is used, which is more
complicated, but works whenever the normal user login works.
:return JWT token
:raise NifiUnexpectedReplyError
"""
# We will need to store some cookies - create a session which will automatically handle it
# Also, set path to certificates
session = requests.Session()
session.verify = config.CA_CERT_FILE
session.cert = (config.MGMT_USER_CERT_PATH, config.MGMT_USER_KEY_PATH)
# Initiate login process by querying the NiFi login page.
# NiFi should set the 'nifi-oidc-request-identifier' cookie (stored into session, will be needed later) and
# redirect us to Keycloak. The redirection is automatically followed by requests.get().
# Keycloak should authenticate us using the provided certificate and present a web page with confirmation form.
url = NIFI_LOGIN_URL.format(soctools_proxy=config.SOCTOOLSPROXY)
resp = session.get(url, allow_redirects=True)
if not resp.ok:
raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
# Parse the returned web page, find the form with id="kc-x509-login-info" and takes URL from its "action" attribute.
re_get_from_url = r'<form [^>]*id="kc-x509-login-info"[^>]* action="([^"]*)"'
match = re.search(re_get_from_url, resp.text)
if not match:
# try to get error message
match2 = re.search(r'<span class="kc-feedback-text">(.*?)</span>', resp.text)
if match2:
raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP reply content (can't find the x509 login form). It contains the following, probably an error message: {match2.group(1)}.")
else:
raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP reply content (can't find the x509 login from). Queried URL: {resp.url}")
url = match.group(1)
url = url.replace("&", "&")
# Send POST request to the URL to simulate clicking the "Continue" button
# Keycloak should redirect us to NiFi's callback URL. Requests automatically follow this redirection, so we should
# receive 200 OK from NiFi, whose content is the JWT
data = "login=Continue"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
resp = session.post(url, data=data, headers=headers, allow_redirects=True)
if not resp.ok:
raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
# Now, we are authenticated to NiFi, identified by a cookie (stored within our session object).
# Use the cookie and ask for the JWT token we need for API requests
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/access/oidc/exchange"
print(f"_nifi_get_jwt: POST request to: {url}")
resp = session.post(url) # POST must be used even though no data are being sent
if not resp.ok:
raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
return resp.text
def nifi_get_users() -> List[Dict]:
"""
List users defined in NiFi
:return List of dicts with keys 'id', 'name', 'groups' (list of group names)
:raise NifiUnexpectedReplyError
"""
url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/users"
token = _nifi_get_jwt()
resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
if not resp.ok:
raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected reply {resp.status_code}")
users = []
try:
raw_users = resp.json()['users']
#print(raw_users)
for user in raw_users:
users.append({
'id': user["component"]["id"],
'name': user["component"]["identity"],
'groups': [g["component"]["identity"] for g in user["component"]["userGroups"]],
})
users.sort(key=itemgetter('name'))
#print(users)
except (ValueError, TypeError, KeyError):
raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected content received")
return users
def nifi_add_user(user: 'UserAccount'):
"""Add a new user to NiFi
:raises NifiError
"""
user_name = user.username
user_group = "Administrators" # no support for other groups in NiFi, yet
# TODO
def nifi_delete_user(user_name: str):
"""Delete a user from NiFi
:raises NifiError
"""
# TODO