-
Mohammad Torkashvand authoredMohammad Torkashvand authored
oidc_policy_helper.py 16.42 KiB
"""OpenID Connect and Open Policy Agent Integration for GSO Application.
This module provides helper functions and classes for handling OpenID Connect (OIDC) and
Open Policy Agent (OPA) related functionalities within the GSO application. It includes
implementations for OIDC-based user authentication and user information modeling. Additionally,
it facilitates making authorization decisions based on policies defined in OPA. Key components
comprise OIDCUser, OIDCUserModel, OPAResult, and opa_decision. These elements integrate with
FastAPI to ensure secure API development.
"""
import re
import ssl
from collections.abc import AsyncGenerator, Awaitable, Callable, Mapping
from http import HTTPStatus
from json import JSONDecodeError
from typing import Any, ClassVar, cast
from fastapi.exceptions import HTTPException
from fastapi.param_functions import Depends
from fastapi.requests import Request
from fastapi.security.http import HTTPBearer
from httpx import AsyncClient, NetworkError
from pydantic import BaseModel
from starlette.requests import ClientDisconnect
from structlog import get_logger
from gso.auth.settings import oauth2lib_settings
logger = get_logger(__name__)
HTTPX_SSL_CONTEXT = ssl.create_default_context() # https://github.com/encode/httpx/issues/838
_CALLBACK_STEP_API_URL_PATTERN = re.compile(
r"^/api/processes/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
r"/callback/([0-9a-zA-Z\-_]+)$"
)
def _is_callback_step_endpoint(request: Request) -> bool:
"""Check if the request is a callback step API call."""
return re.match(_CALLBACK_STEP_API_URL_PATTERN, request.url.path) is not None
class InvalidScopeValueError(ValueError):
"""Exception raised for invalid scope values in OIDC."""
class OIDCUserModel(dict):
"""The standard claims of a OIDCUserModel object. Defined per `Section 5.1`_ and AAI attributes.
.. _`Section 5.1`: http://openid.net/specs/openid-connect-core-1_0.html#StandardClaims
"""
#: registered claims that OIDCUserModel supports
REGISTERED_CLAIMS: ClassVar[list[str]] = [
"sub",
"name",
"given_name",
"family_name",
"middle_name",
"nickname",
"preferred_username",
"profile",
"picture",
"website",
"email",
"email_verified",
"gender",
"birthdate",
"zoneinfo",
"locale",
"phone_number",
"phone_number_verified",
"address",
"updated_at",
]
def __getattr__(self, key: str) -> Any:
"""Get an attribute value using key.
Overrides the default behavior to return the value from the dictionary
if the attribute is one of the registered claims or raises an AttributeError
if the key is not found.
Args:
----
key: The attribute name to retrieve.
Returns:
-------
The value of the attribute if it exists, otherwise raises AttributeError.
"""
try:
return object.__getattribute__(self, key)
except AttributeError as error:
if key in self.REGISTERED_CLAIMS:
return self.get(key)
raise error from None
@property
def user_name(self) -> str:
"""Return the username of the user."""
if "user_name" in self.keys():
return cast(str, self["user_name"])
if "unspecified_id" in self.keys():
return cast(str, self["unspecified_id"])
return ""
@property
def display_name(self) -> str:
"""Return the display name of the user."""
return self.get("display_name", "")
@property
def principal_name(self) -> str:
"""Return the principal name of the user."""
return self.get("eduperson_principal_name", "")
@property
def scopes(self) -> set[str]:
"""Return the scopes of the user."""
scope_value = self.get("scope")
if scope_value is None:
return set()
if isinstance(scope_value, list):
return {item for item in scope_value if isinstance(item, str)}
if isinstance(scope_value, str):
return set(filter(None, re.split("[ ,]", scope_value)))
message = f"Invalid scope value: {scope_value}"
raise InvalidScopeValueError(message)
async def _make_async_client() -> AsyncGenerator[AsyncClient, None]:
async with AsyncClient(http1=True, verify=HTTPX_SSL_CONTEXT) as client:
yield client
class OIDCConfig(BaseModel):
"""Configuration for OpenID Connect (OIDC) authentication and token validation."""
issuer: str
authorization_endpoint: str
token_endpoint: str
userinfo_endpoint: str
introspect_endpoint: str | None = None
introspection_endpoint: str | None = None
jwks_uri: str
response_types_supported: list[str]
response_modes_supported: list[str]
grant_types_supported: list[str]
subject_types_supported: list[str]
id_token_signing_alg_values_supported: list[str]
scopes_supported: list[str]
token_endpoint_auth_methods_supported: list[str]
claims_supported: list[str]
claims_parameter_supported: bool
request_parameter_supported: bool
code_challenge_methods_supported: list[str]
class OPAResult(BaseModel):
"""Represents the outcome of an authorization decision made by the Open Policy Agent (OPA).
Attributes
----------
- result (bool): Indicates whether the access request is allowed or denied.
- decision_id (str): A unique identifier for the decision made by OPA.
"""
result: bool = False
decision_id: str
class OIDCUser(HTTPBearer):
"""OIDCUser class extends the :term:`HTTPBearer` class to do extra verification.
The class will act as follows:
1. Validate the Credentials at :term: `AAI` proxy by calling the UserInfo endpoint
"""
openid_config: OIDCConfig | None = None
openid_url: str
resource_server_id: str
resource_server_secret: str
def __init__(
self,
openid_url: str,
resource_server_id: str,
resource_server_secret: str,
*,
auto_error: bool = True,
scheme_name: str | None = None,
):
"""Set up OIDCUser with specified OpenID Connect configurations and credentials."""
super().__init__(auto_error=auto_error)
self.openid_url = openid_url
self.resource_server_id = resource_server_id
self.resource_server_secret = resource_server_secret
self.scheme_name = scheme_name or self.__class__.__name__
async def __call__( # type: ignore[override]
self, request: Request, token: str | None = None
) -> OIDCUserModel | None:
"""Return the OIDC user from OIDC introspect endpoint.
This is used as a security module in Fastapi projects
Args:
----
request: Starlette request method.
token: Optional value to directly pass a token.
Returns:
-------
OIDCUserModel object.
"""
if not oauth2lib_settings.OAUTH2_ACTIVE:
return None
async with AsyncClient(http1=True, verify=HTTPX_SSL_CONTEXT) as async_request:
if not token:
credentials = await super().__call__(request)
if not credentials:
return None
token = credentials.credentials
elif _is_callback_step_endpoint(request):
logger.debug(
"callback step endpoint is called. verification will be done by endpoint itself.", url=request.url
)
return None
await self.check_openid_config(async_request)
intercepted_token = await self.introspect_token(async_request, token)
if "active" not in intercepted_token:
logger.error("Token doesn't have the mandatory 'active' key, probably caused by a caching problem")
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="Missing active key")
if not intercepted_token.get("active", False):
logger.info("User is not active", url=request.url, user_info=intercepted_token)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="User is not active")
user_info = await self.userinfo(async_request, token)
logger.debug("OIDCUserModel object.", intercepted_token=intercepted_token)
return user_info
async def check_openid_config(self, async_request: AsyncClient) -> None:
"""Check of openid config is loaded and load if not."""
if self.openid_config is not None:
return
response = await async_request.get(self.openid_url + "/.well-known/openid-configuration")
self.openid_config = OIDCConfig.parse_obj(response.json())
async def userinfo(self, async_request: AsyncClient, token: str) -> OIDCUserModel:
"""Get the userinfo from the openid server.
:param AsyncClient async_request: The async request
:param str token: the access_token
:return: OIDCUserModel: OIDC user model from openid server
"""
await self.check_openid_config(async_request)
assert self.openid_config, "OpenID config should be loaded" # noqa: S101
response = await async_request.post(
self.openid_config.userinfo_endpoint,
data={"token": token},
headers={"Authorization": f"Bearer {token}"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse userinfo response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid userinfo", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Userinfo cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
return OIDCUserModel(data)
async def introspect_token(self, async_request: AsyncClient, token: str) -> dict:
"""Introspect the access token to see if it is a valid token.
:param async_request: The async request
:param token: the access_token
:return: dict from openid server
"""
await self.check_openid_config(async_request)
assert self.openid_config, "OpenID config should be loaded" # noqa: S101
endpoint = self.openid_config.introspect_endpoint or self.openid_config.introspection_endpoint or ""
response = await async_request.post(
endpoint,
data={"token": token, "client_id": self.resource_server_id},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse introspect response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid introspect", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Introspect cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
return data
async def _get_decision(async_request: AsyncClient, opa_url: str, opa_input: dict) -> OPAResult:
logger.debug("Posting input json to Policy agent", opa_url=opa_url, input=opa_input)
try:
response = await async_request.post(opa_url, json=opa_input)
except (NetworkError, TypeError) as exc:
logger.debug("Could not get decision from policy agent", error=str(exc))
raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Policy agent is unavailable") from exc
result = response.json()
logger.debug("Received response from Policy agent", response=result)
return OPAResult(result=result["result"]["allow"], decision_id=result["decision_id"])
def _evaluate_decision(decision: OPAResult, *, auto_error: bool, **context: dict[str, Any]) -> bool:
did = decision.decision_id
if decision.result:
logger.debug("User is authorized to access the resource", decision_id=did, **context)
return True
logger.debug("User is not allowed to access the resource", decision_id=did, **context)
if not auto_error:
return False
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=f"User is not allowed to access resource: {context.get('resource')} Decision was taken with id: {did}",
)
def opa_decision(
opa_url: str,
oidc_security: OIDCUser,
*,
auto_error: bool = True,
opa_kwargs: Mapping[str, str] | None = None,
) -> Callable[[Request, OIDCUserModel, AsyncClient], Awaitable[bool | None]]:
"""Create a decision function for Open Policy Agent (OPA) authorization checks.
This function generates an asynchronous decision function that can be used in FastAPI endpoints
to authorize requests based on OPA policies. It utilizes OIDC for user information and makes a
call to the OPA service to determine authorization.
Args:
----
opa_url: URL of the Open Policy Agent service.
oidc_security: An instance of OIDCUser for user authentication.
auto_error: If True, automatically raises an HTTPException on authorization failure.
opa_kwargs: Additional keyword arguments to be passed to the OPA input.
Returns:
-------
An asynchronous decision function that can be used as a dependency in FastAPI endpoints.
"""
async def _opa_decision(
request: Request,
user_info: OIDCUserModel = Depends(oidc_security), # noqa: B008
async_request: AsyncClient = Depends(_make_async_client), # noqa: B008
) -> bool | None:
"""Check OIDCUserModel against the OPA policy.
This is used as a security module in Fastapi projects
This method will make an async call towards the Policy agent.
Args:
----
request: Request object that will be used to retrieve request metadata.
user_info: The OIDCUserModel object that will be checked
async_request: The :term:`httpx` client.
"""
if not (oauth2lib_settings.OAUTH2_ACTIVE and oauth2lib_settings.OAUTH2_AUTHORIZATION_ACTIVE):
return None
try:
json = await request.json()
# Silencing the Decode error or Type error when request.json() does not return anything sane.
# Some requests do not have a json response therefore as this code gets called on every request
# we need to suppress the `None` case (TypeError) or the `other than json` case (JSONDecodeError)
# Suppress AttributeError in case of websocket request, it doesn't have .json
except (JSONDecodeError, TypeError, ClientDisconnect, AttributeError):
json = {}
# defaulting to GET request method for WebSocket request, it doesn't have .method
request_method = request.method if hasattr(request, "method") else "GET"
opa_input = {
"input": {
**(opa_kwargs or {}),
**user_info,
"resource": request.url.path,
"method": request_method,
"arguments": {"path": request.path_params, "query": {**request.query_params}, "json": json},
}
}
decision = await _get_decision(async_request, opa_url, opa_input)
context = {
"resource": opa_input["input"]["resource"],
"method": opa_input["input"]["method"],
"user_info": user_info,
"input": opa_input,
"url": request.url,
}
return _evaluate_decision(decision, auto_error=auto_error, **context)
return _opa_decision