Skip to content
Snippets Groups Projects
  • Václav Bartoš's avatar
    26a8fced
    NiFi user management rewritten to use NiFi API · 26a8fced
    Václav Bartoš authored
    API is authenticated via token requested from Keycloak by simulating normal user login.
    A special user account "soctools-user-mgmt" is used to manage user in NiFi (this must be pre-created duing soctools installation).
    Many other related changes.
    26a8fced
    History
    NiFi user management rewritten to use NiFi API
    Václav Bartoš authored
    API is authenticated via token requested from Keycloak by simulating normal user login.
    A special user account "soctools-user-mgmt" is used to manage user in NiFi (this must be pre-created duing soctools installation).
    Many other related changes.
nifi.py 5.71 KiB
"""Functions to manage user accounts in NiFi"""

from typing import List, Dict, Optional
import requests
import re
from operator import itemgetter

import config
config.SOCTOOLSPROXY = "gn4soctools3.liberouter.org"

# URL to initial login process
NIFI_LOGIN_URL = "https://{soctools_proxy}:9443/nifi/login"
# Base URL to NiFi API endpoints
NIFI_API_BASE_URL = "https://{soctools_proxy}:9443/nifi-api"

# NiFi API documentation: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html

class NifiError(Exception):
    pass

class NifiUserNotFoundError(NifiError):
    pass

class NifiUserExistsError(NifiError):
    pass

class NifiUnexpectedReplyError(NifiError):
    pass

def _nifi_get_jwt() -> str:
    """
    Get OIDC token (JWT) for authenticating API requests

    Simulate standard login process like a user would do via browser.
    It would be better to use "Resource Owner Password Credentials Grant" flow (or "Direct Access Grant" as Keycloak
    calls it), which is more suitable for machine clients. However, when such flow is used, Keycloak generates
    a differently formatted JWT token (don't know why), which NiFi doesn't accept.
    I tried everything to make it work, but unsuccessfully. Therefore, the standard flow is used, which is more
    complicated, but works whenever the normal user login works.

    :return JWT token
    :raise NifiUnexpectedReplyError
    """
    # We will need to store some cookies - create a session which will automatically handle it
    # Also, set path to certificates
    session = requests.Session()
    session.verify = config.CA_CERT_FILE
    session.cert = (config.MGMT_USER_CERT_PATH, config.MGMT_USER_KEY_PATH)

    # Initiate login process by querying the NiFi login page.
    # NiFi should set the 'nifi-oidc-request-identifier' cookie (stored into session, will be needed later) and
    # redirect us to Keycloak. The redirection is automatically followed by requests.get().
    # Keycloak should authenticate us using the provided certificate and present a web page with confirmation form.
    url = NIFI_LOGIN_URL.format(soctools_proxy=config.SOCTOOLSPROXY)
    resp = session.get(url, allow_redirects=True)
    if not resp.ok:
        raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
    # Parse the returned web page, find the form with id="kc-x509-login-info" and takes URL from its "action" attribute.
    re_get_from_url = r'<form [^>]*id="kc-x509-login-info"[^>]* action="([^"]*)"'
    match = re.search(re_get_from_url, resp.text)
    if not match:
        # try to get error message
        match2 = re.search(r'<span class="kc-feedback-text">(.*?)</span>', resp.text)
        if match2:
            raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP reply content (can't find the x509 login form). It contains the following, probably an error message: {match2.group(1)}.")
        else:
            raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP reply content (can't find the x509 login from). Queried URL: {resp.url}")
    url = match.group(1)
    url = url.replace("&amp;", "&")
    # Send POST request to the URL to simulate clicking the "Continue" button
    # Keycloak should redirect us to NiFi's callback URL. Requests automatically follow this redirection, so we should
    # receive 200 OK from NiFi, whose content is the JWT
    data = "login=Continue"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    resp = session.post(url, data=data, headers=headers, allow_redirects=True)
    if not resp.ok:
        raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")

    # Now, we are authenticated to NiFi, identified by a cookie (stored within our session object).
    # Use the cookie and ask for the JWT token we need for API requests
    url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/access/oidc/exchange"
    print(f"_nifi_get_jwt: POST request to: {url}")
    resp = session.post(url) # POST must be used even though no data are being sent
    if not resp.ok:
        raise NifiUnexpectedReplyError(f"_nifi_get_jwt: Received unexpected HTTP status code ({resp.status_code}) from URL '{url}'.")
    return resp.text



def nifi_get_users() -> List[Dict]:
    """
    List users defined in NiFi

    :return List of dicts with keys 'id', 'name', 'groups' (list of group names)
    :raise NifiUnexpectedReplyError
    """
    url = NIFI_API_BASE_URL.format(soctools_proxy=config.SOCTOOLSPROXY) + "/tenants/users"
    token = _nifi_get_jwt()
    resp = requests.get(url, headers={'Authorization': 'Bearer ' + token}, verify=config.CA_CERT_FILE)
    if not resp.ok:
        raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected reply {resp.status_code}")
    users = []
    try:
        raw_users = resp.json()['users']
        #print(raw_users)
        for user in raw_users:
            users.append({
                'id': user["component"]["id"],
                'name': user["component"]["identity"],
                'groups': [g["component"]["identity"] for g in user["component"]["userGroups"]],
            })
        users.sort(key=itemgetter('name'))
        #print(users)
    except (ValueError, TypeError, KeyError):
        raise NifiUnexpectedReplyError(f"Can't get list of users from NiFi: Unexpected content received")
    return users


def nifi_add_user(user: 'UserAccount'):
    """Add a new user to NiFi

    :raises NifiError
    """
    user_name = user.username
    user_group = "Administrators" # no support for other groups in NiFi, yet

    # TODO


def nifi_delete_user(user_name: str):
    """Delete a user from NiFi

    :raises NifiError
    """

    # TODO