"""Module contains the OPA authorization class that is used to get decisions from the OPA server.""" from http import HTTPStatus from fastapi.exceptions import HTTPException from httpx import AsyncClient, NetworkError from oauth2_lib.fastapi import GraphQLOPAAuthorization, OPAAuthorization, OPAResult from oauth2_lib.settings import oauth2lib_settings from structlog import get_logger logger = get_logger(__name__) async def _get_decision(opa_url: str, async_request: AsyncClient, opa_input: dict) -> OPAResult: logger.debug("Posting input json to Policy agent", opa_url=opa_url, input=opa_input) try: result = await async_request.post(opa_url, json=opa_input) except (NetworkError, TypeError) as exc: logger.debug("Could not get decision from policy agent", error=str(exc)) raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Policy agent is unavailable") from exc json_result = result.json() logger.debug("Received decision from policy agent", decision=json_result) return OPAResult(decision_id=json_result["decision_id"], result=json_result["result"]["allow"]) class OPAAuthZ(OPAAuthorization): """Applies OPA decisions to HTTP requests for authorization.""" async def get_decision(self, async_request: AsyncClient, opa_input: dict) -> OPAResult: """Get the decision from the OPA server.""" return await _get_decision(self.opa_url, async_request, opa_input) class GraphQLOPAAuthZ(GraphQLOPAAuthorization): """Specializes OPA authorization for GraphQL operations.""" async def get_decision(self, async_request: AsyncClient, opa_input: dict) -> OPAResult: """Get the decision from the OPA server.""" return await _get_decision(self.opa_url, async_request, opa_input) opa_instance = OPAAuthZ( opa_url=oauth2lib_settings.OPA_URL, ) graphql_opa_instance = GraphQLOPAAuthZ( opa_url=oauth2lib_settings.OPA_URL, )