Skip to content
Snippets Groups Projects
Commit ba98cc43 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

refacrtor authentication

parent edce322f
No related branches found
No related tags found
1 merge request!215Feature/nat 468 refactor auth
"""The main entrypoint for :term:`GSO`, and the different ways in which it can be run."""
from gso import monkeypatches # noqa: F401, isort:skip
import typer
from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app
......
......@@ -6,10 +6,11 @@ from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from orchestrator.db import ProcessStepTable
from orchestrator.schemas.base import OrchestratorBaseModel
from orchestrator.security import get_authorization
from gso.auth.security import opa_security_default
authorization = get_authorization()
router = APIRouter(prefix="/processes", tags=["Processes"], dependencies=[Depends(opa_security_default)])
router = APIRouter(prefix="/processes", tags=["Processes"], dependencies=[Depends(authorization.authorize)])
class CallBackResultsBaseModel(OrchestratorBaseModel):
......
"""OpenID Connect and Open Policy Agent Integration for GSO Application.
This module provides helper functions and classes for handling OpenID Connect (OIDC) and
Open Policy Agent (OPA) related functionalities within the GSO application. It includes
implementations for OIDC-based user authentication and user information modeling. Additionally,
it facilitates making authorization decisions based on policies defined in OPA. Key components
comprise OIDCUser, OIDCUserModel, OPAResult, and opa_decision. These elements integrate with
FastAPI to ensure secure API development.
"""
import re
from functools import wraps
from http import HTTPStatus
from json import JSONDecodeError
from fastapi.exceptions import HTTPException
from fastapi.requests import Request
from httpx import AsyncClient
from oauth2_lib.fastapi import OIDCAuth, OIDCUserModel
from oauth2_lib.settings import oauth2lib_settings
from structlog import get_logger
logger = get_logger(__name__)
_CALLBACK_STEP_API_URL_PATTERN = re.compile(
r"^/api/processes/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
r"/callback/([0-9a-zA-Z\-_]+)$"
)
def _is_callback_step_endpoint(request: Request) -> bool:
"""Check if the request is a callback step API call."""
return re.match(_CALLBACK_STEP_API_URL_PATTERN, request.url.path) is not None
def ensure_openid_config_loaded(func):
@wraps(func)
async def wrapper(self, async_request: AsyncClient, *args, **kwargs):
await self.check_openid_config(async_request)
return await func(self, async_request, *args, **kwargs)
return wrapper
class OIDCAuthentication(OIDCAuth):
"""OIDCUser class extends the :term:`HTTPBearer` class to do extra verification.
The class will act as follows:
1. Validate the Credentials at :term: `AAI` proxy by calling the UserInfo endpoint
"""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(OIDCAuthentication, cls).__new__(cls)
return cls._instance
@staticmethod
async def is_bypassable_request(request: Request) -> bool:
return _is_callback_step_endpoint(request=request)
@ensure_openid_config_loaded
async def userinfo(self, async_request: AsyncClient, token: str) -> OIDCUserModel:
"""Get the userinfo from the openid server.
:param AsyncClient async_request: The async request
:param str token: the access_token
:return: OIDCUserModel: OIDC user model from openid server
"""
await self.introspect_token(async_request, token)
response = await async_request.post(
self.openid_config.userinfo_endpoint,
data={"token": token},
headers={"Authorization": f"Bearer {token}"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse userinfo response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid userinfo", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Userinfo cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
return OIDCUserModel(data)
@ensure_openid_config_loaded
async def introspect_token(self, async_request: AsyncClient, token: str) -> dict:
"""Introspect the access token to see if it is a valid token.
:param async_request: The async request
:param token: the access_token
:return: dict from openid server
"""
endpoint = self.openid_config.introspect_endpoint or self.openid_config.introspection_endpoint or ""
response = await async_request.post(
endpoint,
data={"token": token, "client_id": self.resource_server_id},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse introspect response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid introspect", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Introspect cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
if "active" not in data:
logger.error("Token doesn't have the mandatory 'active' key, probably caused by a caching problem")
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="Missing active key")
if not data.get("active", False):
logger.info("User is not active", user_info=data)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="User is not active")
return data
oidc_instance = OIDCAuthentication(
openid_url=oauth2lib_settings.OIDC_BASE_URL,
openid_config_url=oauth2lib_settings.OIDC_CONF_URL, # Corrected parameter name
resource_server_id=oauth2lib_settings.OAUTH2_RESOURCE_SERVER_ID,
resource_server_secret=oauth2lib_settings.OAUTH2_RESOURCE_SERVER_SECRET,
oidc_user_model_cls=OIDCUserModel,
)
"""OpenID Connect and Open Policy Agent Integration for GSO Application.
This module provides helper functions and classes for handling OpenID Connect (OIDC) and
Open Policy Agent (OPA) related functionalities within the GSO application. It includes
implementations for OIDC-based user authentication and user information modeling. Additionally,
it facilitates making authorization decisions based on policies defined in OPA. Key components
comprise OIDCUser, OIDCUserModel, OPAResult, and opa_decision. These elements integrate with
FastAPI to ensure secure API development.
"""
import re
import ssl
from collections.abc import AsyncGenerator, Awaitable, Callable, Mapping
from http import HTTPStatus
from json import JSONDecodeError
from typing import Any, ClassVar, cast
from fastapi.exceptions import HTTPException
from fastapi.param_functions import Depends
from fastapi.requests import Request
from fastapi.security.http import HTTPBearer
from httpx import AsyncClient, NetworkError
from pydantic import BaseModel
from starlette.requests import ClientDisconnect
from structlog import get_logger
from gso.auth.settings import oauth2lib_settings
logger = get_logger(__name__)
HTTPX_SSL_CONTEXT = ssl.create_default_context() # https://github.com/encode/httpx/issues/838
_CALLBACK_STEP_API_URL_PATTERN = re.compile(
r"^/api/processes/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
r"/callback/([0-9a-zA-Z\-_]+)$"
)
def _is_callback_step_endpoint(request: Request) -> bool:
"""Check if the request is a callback step API call."""
return re.match(_CALLBACK_STEP_API_URL_PATTERN, request.url.path) is not None
class InvalidScopeValueError(ValueError):
"""Exception raised for invalid scope values in OIDC."""
class OIDCUserModel(dict):
"""The standard claims of a OIDCUserModel object. Defined per `Section 5.1`_ and AAI attributes.
.. _`Section 5.1`: http://openid.net/specs/openid-connect-core-1_0.html#StandardClaims
"""
#: registered claims that OIDCUserModel supports
REGISTERED_CLAIMS: ClassVar[list[str]] = [
"sub",
"name",
"given_name",
"family_name",
"middle_name",
"nickname",
"preferred_username",
"profile",
"picture",
"website",
"email",
"email_verified",
"gender",
"birthdate",
"zoneinfo",
"locale",
"phone_number",
"phone_number_verified",
"address",
"updated_at",
]
def __getattr__(self, key: str) -> Any:
"""Get an attribute value using key.
Overrides the default behavior to return the value from the dictionary
if the attribute is one of the registered claims or raises an AttributeError
if the key is not found.
:param str key: The attribute name to retrieve.
:return: The value of the attribute if it exists, otherwise raises AttributeError.
"""
try:
return object.__getattribute__(self, key)
except AttributeError as error:
if key in self.REGISTERED_CLAIMS:
return self.get(key)
raise error from None
@property
def client_id(self) -> str:
"""Return the client id."""
return self.get("client_id") or ""
@property
def user_name(self) -> str:
"""Return the username of the user."""
if "user_name" in self.keys():
return cast(str, self["user_name"])
if "unspecified_id" in self.keys():
return cast(str, self["unspecified_id"])
return ""
@property
def display_name(self) -> str:
"""Return the display name of the user."""
return self.get("display_name", "")
@property
def principal_name(self) -> str:
"""Return the principal name of the user."""
return self.get("eduperson_principal_name", "")
@property
def scopes(self) -> set[str]:
"""Return the scopes of the user."""
scope_value = self.get("scope")
if scope_value is None:
return set()
if isinstance(scope_value, list):
return {item for item in scope_value if isinstance(item, str)}
if isinstance(scope_value, str):
return set(filter(None, re.split("[ ,]", scope_value)))
message = f"Invalid scope value: {scope_value}"
raise InvalidScopeValueError(message)
async def _make_async_client() -> AsyncGenerator[AsyncClient, None]:
async with AsyncClient(http1=True, verify=HTTPX_SSL_CONTEXT) as client:
yield client
class OIDCConfig(BaseModel):
"""Configuration for OpenID Connect (OIDC) authentication and token validation."""
issuer: str
authorization_endpoint: str
token_endpoint: str
userinfo_endpoint: str
introspect_endpoint: str | None = None
introspection_endpoint: str | None = None
jwks_uri: str
response_types_supported: list[str]
response_modes_supported: list[str]
grant_types_supported: list[str]
subject_types_supported: list[str]
id_token_signing_alg_values_supported: list[str]
scopes_supported: list[str]
token_endpoint_auth_methods_supported: list[str]
claims_supported: list[str]
claims_parameter_supported: bool
request_parameter_supported: bool
code_challenge_methods_supported: list[str]
class OPAResult(BaseModel):
"""Represents the outcome of an authorization decision made by the Open Policy Agent (OPA).
Attributes
----------
- result (bool): Indicates whether the access request is allowed or denied.
- decision_id (str): A unique identifier for the decision made by OPA.
"""
result: bool = False
decision_id: str
class OIDCUser(HTTPBearer):
"""OIDCUser class extends the :term:`HTTPBearer` class to do extra verification.
The class will act as follows:
1. Validate the Credentials at :term: `AAI` proxy by calling the UserInfo endpoint
"""
openid_config: OIDCConfig | None = None
openid_url: str
resource_server_id: str
resource_server_secret: str
def __init__(
self,
openid_url: str,
resource_server_id: str,
resource_server_secret: str,
*,
auto_error: bool = True,
scheme_name: str | None = None,
):
"""Set up OIDCUser with specified OpenID Connect configurations and credentials."""
super().__init__(auto_error=auto_error)
self.openid_url = openid_url
self.resource_server_id = resource_server_id
self.resource_server_secret = resource_server_secret
self.scheme_name = scheme_name or self.__class__.__name__
async def __call__( # type: ignore[override]
self, request: Request, token: str | None = None
) -> OIDCUserModel | None:
"""Return the OIDC user from OIDC introspect endpoint.
This is used as a security module in Fastapi projects
:param Request request: Starlette request method.
:param str token: Optional value to directly pass a token.
:return: OIDCUserModel object.
"""
if not oauth2lib_settings.OAUTH2_ACTIVE:
return None
async with AsyncClient(http1=True, verify=HTTPX_SSL_CONTEXT) as async_request:
if not token:
credentials = await super().__call__(request)
if not credentials:
return None
token = credentials.credentials
elif _is_callback_step_endpoint(request):
logger.debug(
"callback step endpoint is called. verification will be done by endpoint itself.", url=request.url
)
return None
await self.check_openid_config(async_request)
intercepted_token = await self.introspect_token(async_request, token)
if "active" not in intercepted_token:
logger.error("Token doesn't have the mandatory 'active' key, probably caused by a caching problem")
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="Missing active key")
if not intercepted_token.get("active", False):
logger.info("User is not active", url=request.url, user_info=intercepted_token)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail="User is not active")
client_id = intercepted_token.get("client_id")
if "sub" not in intercepted_token:
return OIDCUserModel(client_id=client_id)
user_info = await self.userinfo(async_request, token)
user_info["client_id"] = client_id
logger.debug("OIDCUserModel object.", intercepted_token=intercepted_token)
return user_info
async def check_openid_config(self, async_request: AsyncClient) -> None:
"""Check of openid config is loaded and load if not."""
if self.openid_config is not None:
return
response = await async_request.get(self.openid_url + "/.well-known/openid-configuration")
self.openid_config = OIDCConfig.model_validate(response.json())
async def userinfo(self, async_request: AsyncClient, token: str) -> OIDCUserModel:
"""Get the userinfo from the openid server.
:param AsyncClient async_request: The async request
:param str token: the access_token
:return: OIDCUserModel: OIDC user model from openid server
"""
await self.check_openid_config(async_request)
assert self.openid_config, "OpenID config should be loaded" # noqa: S101
response = await async_request.post(
self.openid_config.userinfo_endpoint,
data={"token": token},
headers={"Authorization": f"Bearer {token}"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse userinfo response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid userinfo", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Userinfo cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
return OIDCUserModel(data)
async def introspect_token(self, async_request: AsyncClient, token: str) -> dict:
"""Introspect the access token to see if it is a valid token.
:param async_request: The async request
:param token: the access_token
:return: dict from openid server
"""
await self.check_openid_config(async_request)
assert self.openid_config, "OpenID config should be loaded" # noqa: S101
endpoint = self.openid_config.introspect_endpoint or self.openid_config.introspection_endpoint or ""
response = await async_request.post(
endpoint,
data={"token": token, "client_id": self.resource_server_id},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
data = dict(response.json())
except JSONDecodeError as err:
logger.debug(
"Unable to parse introspect response",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text) from err
logger.debug("Response from openid introspect", response=data)
if response.status_code not in range(200, 300):
logger.debug(
"Introspect cannot find an active token, user unauthorized",
detail=response.text,
resource_server_id=self.resource_server_id,
openid_url=self.openid_url,
)
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED, detail=response.text)
return data
async def _get_decision(async_request: AsyncClient, opa_url: str, opa_input: dict) -> OPAResult:
logger.debug("Posting input json to Policy agent", opa_url=opa_url, input=opa_input)
try:
response = await async_request.post(opa_url, json=opa_input)
except (NetworkError, TypeError) as exc:
logger.debug("Could not get decision from policy agent", error=str(exc))
raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Policy agent is unavailable") from exc
result = response.json()
logger.debug("Received response from Policy agent", response=result)
return OPAResult(result=result["result"]["allow"], decision_id=result["decision_id"])
def _evaluate_decision(decision: OPAResult, *, auto_error: bool, **context: dict[str, Any]) -> bool:
did = decision.decision_id
if decision.result:
logger.debug("User is authorized to access the resource", decision_id=did, **context)
return True
logger.debug("User is not allowed to access the resource", decision_id=did, **context)
if not auto_error:
return False
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=f"User is not allowed to access resource: {context.get("resource")} Decision was taken with id: {did}",
)
def opa_decision(
opa_url: str,
oidc_security: OIDCUser,
*,
auto_error: bool = True,
opa_kwargs: Mapping[str, str] | None = None,
) -> Callable[[Request, OIDCUserModel, AsyncClient], Awaitable[bool | None]]:
"""Create a decision function for Open Policy Agent (OPA) authorization checks.
This function generates an asynchronous decision function that can be used in FastAPI endpoints
to authorize requests based on OPA policies. It utilizes OIDC for user information and makes a
call to the OPA service to determine authorization.
:param str opa_url: URL of the Open Policy Agent service.
:param OIDCUser oidc_security: An instance of OIDCUser for user authentication.
:param bool auto_error: If True, automatically raises an HTTPException on authorization failure.
:param Mapping[str, str] | None opa_kwargs: Additional keyword arguments to be passed to the OPA input.
:return: An asynchronous decision function that can be used as a dependency in FastAPI endpoints.
"""
async def _opa_decision(
request: Request,
user_info: OIDCUserModel = Depends(oidc_security), # noqa: B008
async_request: AsyncClient = Depends(_make_async_client), # noqa: B008
) -> bool | None:
"""Check OIDCUserModel against the OPA policy.
This is used as a security module in Fastapi projects
This method will make an async call towards the Policy agent.
Args:
----
request: Request object that will be used to retrieve request metadata.
user_info: The OIDCUserModel object that will be checked
async_request: The :term:`httpx` client.
"""
if not (oauth2lib_settings.OAUTH2_ACTIVE and oauth2lib_settings.OAUTH2_AUTHORIZATION_ACTIVE):
return None
if _is_callback_step_endpoint(request):
return None
try:
json = await request.json()
# Silencing the Decode error or Type error when request.json() does not return anything sane.
# Some requests do not have a json response therefore as this code gets called on every request
# we need to suppress the `None` case (TypeError) or the `other than json` case (JSONDecodeError)
# Suppress AttributeError in case of websocket request, it doesn't have .json
except (JSONDecodeError, TypeError, ClientDisconnect, AttributeError):
json = {}
# defaulting to GET request method for WebSocket request, it doesn't have .method
request_method = request.method if hasattr(request, "method") else "GET"
opa_input = {
"input": {
**(opa_kwargs or {}),
**user_info,
"resource": request.url.path,
"method": request_method.upper(),
"arguments": {"path": request.path_params, "query": {**request.query_params}, "json": json},
}
}
decision = await _get_decision(async_request, opa_url, opa_input)
context = {
"resource": opa_input["input"]["resource"],
"method": opa_input["input"]["method"],
"user_info": user_info,
"input": opa_input,
"url": request.url,
}
return _evaluate_decision(decision, auto_error=auto_error, **context)
return _opa_decision
"""Module for initializing OAuth client credentials and OIDC user."""
from authlib.integrations.starlette_client import OAuth
from nwastdlib.url import URL
from gso.auth.oidc_policy_helper import HTTPX_SSL_CONTEXT, OIDCUser, opa_decision
from gso.auth.settings import oauth2_settings
oauth_client_credentials = OAuth()
well_known_endpoint = URL(oauth2_settings.OIDC_CONF_WELL_KNOWN_URL)
oauth_client_credentials.register(
"connext",
server_metadata_url=well_known_endpoint / ".well-known" / "openid-configuration",
client_id=oauth2_settings.OAUTH2_RESOURCE_SERVER_ID,
client_secret=oauth2_settings.OAUTH2_RESOURCE_SERVER_SECRET,
request_token_params={"grant_type": "client_credentials"},
client_kwargs={"verify": HTTPX_SSL_CONTEXT},
)
oidc_user = OIDCUser(
oauth2_settings.OIDC_CONF_WELL_KNOWN_URL,
oauth2_settings.OAUTH2_RESOURCE_SERVER_ID,
oauth2_settings.OAUTH2_RESOURCE_SERVER_SECRET,
)
opa_security_default = opa_decision(oauth2_settings.OPA_URL, oidc_user)
def get_oidc_user() -> OIDCUser:
"""Retrieve the global OIDCUser instance.
This function returns the instance of OIDCUser initialized in the module.
It is typically used for accessing the OIDCUser across different parts of the application.
:return OIDCUser: The instance of OIDCUser configured with OAuth2 settings.
"""
return oidc_user
"""Override certain classes and settings in the oauth2_lib.fastapi package with custom implementations.
This adjustment is typically done to extend or modify the functionality of the original
oauth2_lib package to meet specific requirements of the gso application.
"""
import oauth2_lib.fastapi
import oauth2_lib.settings
from gso.auth.oidc_policy_helper import HTTPX_SSL_CONTEXT, OIDCUser, OIDCUserModel, _get_decision, opa_decision
from gso.auth.settings import oauth2lib_settings
oauth2_lib.fastapi.OIDCUser = OIDCUser # type: ignore[assignment, misc]
oauth2_lib.fastapi.OIDCUserModel = OIDCUserModel # type: ignore[assignment, misc]
oauth2_lib.fastapi.opa_decision = opa_decision # type: ignore[assignment]
oauth2_lib.fastapi._get_decision = _get_decision # type: ignore[assignment] # noqa: SLF001
oauth2_lib.fastapi.HTTPX_SSL_CONTEXT = HTTPX_SSL_CONTEXT
oauth2_lib.settings.oauth2lib_settings = oauth2lib_settings # type: ignore[assignment]
......@@ -102,5 +102,8 @@
"p_router": "UUID"
},
"scopes": ["https://graph.microsoft.com/.default"]
},
"AUTH": {
"opa_url": "http://127.0.0.1:8181/v1/data/gap/gso/api/access"
}
}
......@@ -171,6 +171,11 @@ class SharepointParams(BaseSettings):
scopes: list[str]
class AuthParams(BaseSettings):
"""Parameters for the authentication service."""
opa_url: str
class OSSParams(BaseSettings):
"""The set of parameters required for running :term:`GSO`."""
......@@ -183,6 +188,7 @@ class OSSParams(BaseSettings):
THIRD_PARTY_API_KEYS: dict[str, str]
EMAIL: EmailParams
SHAREPOINT: SharepointParams
AUTH: AuthParams
def load_oss_params() -> OSSParams:
......
......@@ -5,9 +5,9 @@ import pytest
from fastapi import HTTPException, Request
from httpx import AsyncClient, NetworkError, Response
from gso.auth.oidc_policy_helper import (
from gso.auth.oidc import (
OIDCConfig,
OIDCUser, OIDCUserModel, OPAResult, opa_decision, _get_decision, _evaluate_decision, _is_callback_step_endpoint,
OIDCAuthentication, OIDCUserModel, OPAResult, opa_decision, _get_decision, _evaluate_decision, _is_callback_step_endpoint,
)
from gso.auth.settings import oauth2lib_settings
......@@ -46,7 +46,7 @@ def mock_openid_config():
@pytest.fixture()
def oidc_user(mock_openid_config):
user = OIDCUser(
user = OIDCAuthentication(
openid_url="https://example.proxy.aai.geant.org",
resource_server_id="resource_server",
resource_server_secret="secret", # noqa: S106
......@@ -70,7 +70,7 @@ def mock_request():
@pytest.fixture()
def mock_oidc_user():
oidc_user = AsyncMock(
OIDCUser,
OIDCAuthentication,
openid_url="https://example.com",
resource_server_id="test",
resource_server_secret="secret", # noqa: S106
......@@ -142,7 +142,7 @@ async def test_userinfo_success(oidc_user, mock_async_client):
async def test_opa_decision_success(mock_request, mock_async_client):
mock_user_info = OIDCUserModel({"sub": "123", "name": "John Doe", "email": "johndoe@example.com"})
mock_oidc_user = AsyncMock(spec=OIDCUser)
mock_oidc_user = AsyncMock(spec=OIDCAuthentication)
mock_oidc_user.return_value = AsyncMock(return_value=mock_user_info)
with patch(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment