Skip to content
Snippets Groups Projects

Feature/nat 468 refactor auth

Merged Mohammad Torkashvand requested to merge feature/NAT-468-refactor-auth into develop
5 files
+ 69
8
Compare changes
  • Side-by-side
  • Inline
Files
5
gso/auth/opa.py 0 → 100644
+ 44
0
from http import HTTPStatus
from fastapi.exceptions import HTTPException
from fastapi.params import Depends
from httpx import AsyncClient, NetworkError
from oauth2_lib.fastapi import OIDCUserModel, OPAAuthorization, OPAResult
from oauth2_lib.settings import oauth2lib_settings
from starlette.requests import Request
from structlog import get_logger
from gso.auth.oidc import oidc_instance
logger = get_logger(__name__)
class OPAAuthorization(OPAAuthorization):
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(OPAAuthorization, cls).__new__(cls)
return cls._instance
async def authorize(
self, request: Request, user_info: OIDCUserModel = Depends(oidc_instance.authenticate)
) -> bool | None:
return await super().authorize(request, user_info)
async def get_decision(self, async_request: AsyncClient, opa_input: dict) -> OPAResult:
logger.debug("Posting input json to Policy agent", opa_url=self.opa_url, input=opa_input)
try:
result = await async_request.post(self.opa_url, json=opa_input)
except (NetworkError, TypeError) as exc:
logger.debug("Could not get decision from policy agent", error=str(exc))
raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Policy agent is unavailable")
json_result = result.json()
logger.debug("Received decision from policy agent", decision=json_result)
return OPAResult(decision_id=json_result["decision_id"], result=json_result["result"]["allow"])
opa_instance = OPAAuthorization(
opa_url=oauth2lib_settings.OPA_URL,
)
Loading