Newer
Older

Mohammad Torkashvand
committed
"""OpenID Connect and Open Policy Agent Integration for GSO Application.
This module provides helper functions and classes for handling OpenID Connect (OIDC) and
Open Policy Agent (OPA) related functionalities within the GSO application. It includes
implementations for OIDC-based user authentication and user information modeling. Additionally,
it facilitates making authorization decisions based on policies defined in OPA. Key components
comprise OIDCUser, OIDCUserModel, OPAResult, and opa_decision. These elements integrate with
FastAPI to ensure secure API development.
"""
import re
import ssl
from collections.abc import AsyncGenerator, Awaitable, Callable, Mapping
from http import HTTPStatus
from json import JSONDecodeError
from typing import Any, ClassVar, cast
from fastapi.exceptions import HTTPException
from fastapi.param_functions import Depends
from fastapi.requests import Request
from fastapi.security.http import HTTPBearer
from httpx import AsyncClient, NetworkError
from pydantic import BaseModel
from starlette.requests import ClientDisconnect
from structlog import get_logger
from gso.auth.settings import oauth2lib_settings
logger = get_logger(__name__)
HTTPX_SSL_CONTEXT = ssl.create_default_context() # https://github.com/encode/httpx/issues/838

Mohammad Torkashvand
committed
_CALLBACK_STEP_API_URL_PATTERN = re.compile(
r"^/api/processes/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
r"/callback/([0-9a-zA-Z\-_]+)$"
)
def _is_callback_step_endpoint(request: Request) -> bool:
"""Check if the request is a callback step API call."""
return re.match(_CALLBACK_STEP_API_URL_PATTERN, request.url.path) is not None

Mohammad Torkashvand
committed
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class InvalidScopeValueError(ValueError):
"""Exception raised for invalid scope values in OIDC."""
class OIDCUserModel(dict):
"""The standard claims of a OIDCUserModel object. Defined per `Section 5.1`_ and AAI attributes.
.. _`Section 5.1`: http://openid.net/specs/openid-connect-core-1_0.html#StandardClaims
"""
#: registered claims that OIDCUserModel supports
REGISTERED_CLAIMS: ClassVar[list[str]] = [
"sub",
"name",
"given_name",
"family_name",
"middle_name",
"nickname",
"preferred_username",
"profile",
"picture",
"website",
"email",
"email_verified",
"gender",
"birthdate",
"zoneinfo",
"locale",
"phone_number",
"phone_number_verified",
"address",
"updated_at",
]
def __getattr__(self, key: str) -> Any:
"""Get an attribute value using key.
Overrides the default behavior to return the value from the dictionary
if the attribute is one of the registered claims or raises an AttributeError
if the key is not found.
:param str key: The attribute name to retrieve.
:return: The value of the attribute if it exists, otherwise raises AttributeError.

Mohammad Torkashvand
committed
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
try:
return object.__getattribute__(self, key)
except AttributeError as error:
if key in self.REGISTERED_CLAIMS:
return self.get(key)
raise error from None
@property
def user_name(self) -> str:
"""Return the username of the user."""
if "user_name" in self.keys():
return cast(str, self["user_name"])
if "unspecified_id" in self.keys():
return cast(str, self["unspecified_id"])
return ""
@property
def display_name(self) -> str:
"""Return the display name of the user."""
return self.get("display_name", "")
@property
def principal_name(self) -> str:
"""Return the principal name of the user."""
return self.get("eduperson_principal_name", "")
@property
def scopes(self) -> set[str]:
"""Return the scopes of the user."""
scope_value = self.get("scope")
if scope_value is None:
return set()
if isinstance(scope_value, list):
return {item for item in scope_value if isinstance(item, str)}
if isinstance(scope_value, str):
return set(filter(None, re.split("[ ,]", scope_value)))
message = f"Invalid scope value: {scope_value}"
raise InvalidScopeValueError(message)
async def _make_async_client() -> AsyncGenerator[AsyncClient, None]:
async with AsyncClient(http1=True, verify=HTTPX_SSL_CONTEXT) as client:
yield client
class OIDCConfig(BaseModel):
"""Configuration for OpenID Connect (OIDC) authentication and token validation."""
issuer: str
authorization_endpoint: str
token_endpoint: str
userinfo_endpoint: str
introspect_endpoint: str | None = None
introspection_endpoint: str | None = None
jwks_uri: str
response_types_supported: list[str]
response_modes_supported: list[str]
grant_types_supported: list[str]
subject_types_supported: list[str]
id_token_signing_alg_values_supported: list[str]
scopes_supported: list[str]
token_endpoint_auth_methods_supported: list[str]
claims_supported: list[str]
claims_parameter_supported: bool
request_parameter_supported: bool
code_challenge_methods_supported: list[str]
class OPAResult(BaseModel):
"""Represents the outcome of an authorization decision made by the Open Policy Agent (OPA).
Attributes
----------
- result (bool): Indicates whether the access request is allowed or denied.
- decision_id (str): A unique identifier for the decision made by OPA.

Mohammad Torkashvand
committed
"""
result: bool = False
decision_id: str
class OIDCUser(HTTPBearer):
"""OIDCUser class extends the :term:`HTTPBearer` class to do extra verification.
The class will act as follows:
1. Validate the Credentials at :term: `AAI` proxy by calling the UserInfo endpoint

Mohammad Torkashvand
committed
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
openid_config: OIDCConfig | None = None
openid_url: str
resource_server_id: str
resource_server_secret: str
def __init__(
self,
openid_url: str,
resource_server_id: str,
resource_server_secret: str,
*,
auto_error: bool = True,
scheme_name: str | None = None,
):
"""Set up OIDCUser with specified OpenID Connect configurations and credentials."""
super().__init__(auto_error=auto_error)
self.openid_url = openid_url
self.resource_server_id = resource_server_id
self.resource_server_secret = resource_server_secret
self.scheme_name = scheme_name or self.__class__.__name__
async def __call__( # type: ignore[override]
self, request: Request, token: str | None = None
) -> OIDCUserModel | None:
"""Return the OIDC user from OIDC introspect endpoint.
This is used as a security module in Fastapi projects
:param Request request: Starlette request method.
:param str token: Optional value to directly pass a token.
:return: OIDCUserModel object.

Mohammad Torkashvand
committed
"""
if not oauth2lib_settings.OAUTH2_ACTIVE:
return None
async with AsyncClient(http1=True, verify=HTTPX_SSL_CONTEXT) as async_request:
if not token:
credentials = await super().__call__(request)
if not credentials:
return None
token = credentials.credentials

Mohammad Torkashvand
committed
elif _is_callback_step_endpoint(request):
logger.debug(
"callback step endpoint is called. verification will be done by endpoint itself.", url=request.url
)
return None

Mohammad Torkashvand
committed

Mohammad Torkashvand
committed
await self.check_openid_config(async_request)

Mohammad Torkashvand
committed
intercepted_token = await self.introspect_token(async_request, token)
if "active" not in intercepted_token:
logger.error("Token doesn't have the mandatory 'active' key, probably caused by a caching problem")
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="Missing active key")
if not intercepted_token.get("active", False):
logger.info("User is not active", url=request.url, user_info=intercepted_token)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="User is not active")
user_info = await self.userinfo(async_request, token)
logger.debug("OIDCUserModel object.", intercepted_token=intercepted_token)
return user_info
async def check_openid_config(self, async_request: AsyncClient) -> None:
"""Check of openid config is loaded and load if not."""
if self.openid_config is not None:
return
response = await async_request.get(self.openid_url + "/.well-known/openid-configuration")
self.openid_config = OIDCConfig.parse_obj(response.json())
async def userinfo(self, async_request: AsyncClient, token: str) -> OIDCUserModel:
"""Get the userinfo from the openid server.
:param AsyncClient async_request: The async request
:param str token: the access_token
:return: OIDCUserModel: OIDC user model from openid server

Mohammad Torkashvand
committed
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
"""
await self.check_openid_config(async_request)
assert self.openid_config, "OpenID config should be loaded" # noqa: S101
response = await async_request.post(
self.openid_config.userinfo_endpoint,
data={"token": token},
headers={"Authorization": f"Bearer {token}"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse userinfo response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid userinfo", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Userinfo cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
return OIDCUserModel(data)
async def introspect_token(self, async_request: AsyncClient, token: str) -> dict:
"""Introspect the access token to see if it is a valid token.
:param async_request: The async request
:param token: the access_token
:return: dict from openid server

Mohammad Torkashvand
committed
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
"""
await self.check_openid_config(async_request)
assert self.openid_config, "OpenID config should be loaded" # noqa: S101
endpoint = self.openid_config.introspect_endpoint or self.openid_config.introspection_endpoint or ""
response = await async_request.post(
endpoint,
data={"token": token, "client_id": self.resource_server_id},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse introspect response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid introspect", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Introspect cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
return data
async def _get_decision(async_request: AsyncClient, opa_url: str, opa_input: dict) -> OPAResult:
logger.debug("Posting input json to Policy agent", opa_url=opa_url, input=opa_input)
try:
response = await async_request.post(opa_url, json=opa_input)
except (NetworkError, TypeError) as exc:
logger.debug("Could not get decision from policy agent", error=str(exc))
raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Policy agent is unavailable") from exc
result = response.json()
logger.debug("Received response from Policy agent", response=result)
return OPAResult(result=result["result"]["allow"], decision_id=result["decision_id"])
def _evaluate_decision(decision: OPAResult, *, auto_error: bool, **context: dict[str, Any]) -> bool:
did = decision.decision_id
if decision.result:
logger.debug("User is authorized to access the resource", decision_id=did, **context)
return True
logger.debug("User is not allowed to access the resource", decision_id=did, **context)
if not auto_error:
return False
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=f"User is not allowed to access resource: {context.get('resource')} Decision was taken with id: {did}",
)
def opa_decision(
opa_url: str,
oidc_security: OIDCUser,
*,
auto_error: bool = True,
opa_kwargs: Mapping[str, str] | None = None,
) -> Callable[[Request, OIDCUserModel, AsyncClient], Awaitable[bool | None]]:
"""Create a decision function for Open Policy Agent (OPA) authorization checks.
This function generates an asynchronous decision function that can be used in FastAPI endpoints
to authorize requests based on OPA policies. It utilizes OIDC for user information and makes a
call to the OPA service to determine authorization.
:param str opa_url: URL of the Open Policy Agent service.
:param OIDCUser oidc_security: An instance of OIDCUser for user authentication.
:param bool auto_error: If True, automatically raises an HTTPException on authorization failure.
:param Mapping[str, str] | None opa_kwargs: Additional keyword arguments to be passed to the OPA input.

Mohammad Torkashvand
committed
:return: An asynchronous decision function that can be used as a dependency in FastAPI endpoints.

Mohammad Torkashvand
committed
"""
async def _opa_decision(
request: Request,
user_info: OIDCUserModel = Depends(oidc_security), # noqa: B008
async_request: AsyncClient = Depends(_make_async_client), # noqa: B008
) -> bool | None:
"""Check OIDCUserModel against the OPA policy.
This is used as a security module in Fastapi projects
This method will make an async call towards the Policy agent.
Args:
----
request: Request object that will be used to retrieve request metadata.
user_info: The OIDCUserModel object that will be checked
async_request: The :term:`httpx` client.

Mohammad Torkashvand
committed
"""
if not (oauth2lib_settings.OAUTH2_ACTIVE and oauth2lib_settings.OAUTH2_AUTHORIZATION_ACTIVE):
return None
if _is_callback_step_endpoint(request):
return None

Mohammad Torkashvand
committed
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
try:
json = await request.json()
# Silencing the Decode error or Type error when request.json() does not return anything sane.
# Some requests do not have a json response therefore as this code gets called on every request
# we need to suppress the `None` case (TypeError) or the `other than json` case (JSONDecodeError)
# Suppress AttributeError in case of websocket request, it doesn't have .json
except (JSONDecodeError, TypeError, ClientDisconnect, AttributeError):
json = {}
# defaulting to GET request method for WebSocket request, it doesn't have .method
request_method = request.method if hasattr(request, "method") else "GET"
opa_input = {
"input": {
**(opa_kwargs or {}),
**user_info,
"resource": request.url.path,
"method": request_method,
"arguments": {"path": request.path_params, "query": {**request.query_params}, "json": json},
}
}
decision = await _get_decision(async_request, opa_url, opa_input)
context = {
"resource": opa_input["input"]["resource"],
"method": opa_input["input"]["method"],
"user_info": user_info,
"input": opa_input,
"url": request.url,
}
return _evaluate_decision(decision, auto_error=auto_error, **context)
return _opa_decision