Skip to content
Snippets Groups Projects

add client_id to the user_info sent to opa

Merged Mohammad Torkashvand requested to merge feature/NAT-553-aai-client-credential into develop
All threads resolved!
1 file
+ 17
18
Compare changes
  • Side-by-side
  • Inline
@@ -11,7 +11,6 @@ FastAPI to ensure secure API development.
import re
import ssl
from collections.abc import AsyncGenerator, Awaitable, Callable, Mapping
from enum import StrEnum
from http import HTTPStatus
from json import JSONDecodeError
from typing import Any, ClassVar, cast
@@ -188,13 +187,13 @@ class OIDCUser(HTTPBearer):
resource_server_secret: str
def __init__(
self,
openid_url: str,
resource_server_id: str,
resource_server_secret: str,
*,
auto_error: bool = True,
scheme_name: str | None = None,
self,
openid_url: str,
resource_server_id: str,
resource_server_secret: str,
*,
auto_error: bool = True,
scheme_name: str | None = None,
):
"""Set up OIDCUser with specified OpenID Connect configurations and credentials."""
super().__init__(auto_error=auto_error)
@@ -204,7 +203,7 @@ class OIDCUser(HTTPBearer):
self.scheme_name = scheme_name or self.__class__.__name__
async def __call__( # type: ignore[override]
self, request: Request, token: str | None = None
self, request: Request, token: str | None = None
) -> OIDCUserModel | None:
"""Return the OIDC user from OIDC introspect endpoint.
@@ -242,7 +241,7 @@ class OIDCUser(HTTPBearer):
user_info = await self.userinfo(async_request, token)
user_info['client_id'] = intercepted_token.get("client_id")
user_info["client_id"] = intercepted_token.get("client_id")
logger.debug("OIDCUserModel object.", intercepted_token=intercepted_token)
return user_info
@@ -367,11 +366,11 @@ def _evaluate_decision(decision: OPAResult, *, auto_error: bool, **context: dict
def opa_decision(
opa_url: str,
oidc_security: OIDCUser,
*,
auto_error: bool = True,
opa_kwargs: Mapping[str, str] | None = None,
opa_url: str,
oidc_security: OIDCUser,
*,
auto_error: bool = True,
opa_kwargs: Mapping[str, str] | None = None,
) -> Callable[[Request, OIDCUserModel, AsyncClient], Awaitable[bool | None]]:
"""Create a decision function for Open Policy Agent (OPA) authorization checks.
@@ -388,9 +387,9 @@ def opa_decision(
"""
async def _opa_decision(
request: Request,
user_info: OIDCUserModel = Depends(oidc_security), # noqa: B008
async_request: AsyncClient = Depends(_make_async_client), # noqa: B008
request: Request,
user_info: OIDCUserModel = Depends(oidc_security), # noqa: B008
async_request: AsyncClient = Depends(_make_async_client), # noqa: B008
) -> bool | None:
"""Check OIDCUserModel against the OPA policy.
Loading