-
Mohammad Torkashvand authoredMohammad Torkashvand authored
opa.py 1.90 KiB
"""Module contains the OPA authorization class that is used to get decisions from the OPA server."""
from http import HTTPStatus
from fastapi.exceptions import HTTPException
from httpx import AsyncClient, NetworkError
from oauth2_lib.fastapi import GraphQLOPAAuthorization, OPAAuthorization, OPAResult
from oauth2_lib.settings import oauth2lib_settings
from structlog import get_logger
logger = get_logger(__name__)
async def _get_decision(opa_url: str, async_request: AsyncClient, opa_input: dict) -> OPAResult:
logger.debug("Posting input json to Policy agent", opa_url=opa_url, input=opa_input)
try:
result = await async_request.post(opa_url, json=opa_input)
except (NetworkError, TypeError) as exc:
logger.debug("Could not get decision from policy agent", error=str(exc))
raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Policy agent is unavailable") from exc
json_result = result.json()
logger.debug("Received decision from policy agent", decision=json_result)
return OPAResult(decision_id=json_result["decision_id"], result=json_result["result"]["allow"])
class OPAAuthZ(OPAAuthorization):
"""Applies OPA decisions to HTTP requests for authorization."""
async def get_decision(self, async_request: AsyncClient, opa_input: dict) -> OPAResult:
"""Get the decision from the OPA server."""
return await _get_decision(self.opa_url, async_request, opa_input)
class GraphQLOPAAuthZ(GraphQLOPAAuthorization):
"""Specializes OPA authorization for GraphQL operations."""
async def get_decision(self, async_request: AsyncClient, opa_input: dict) -> OPAResult:
"""Get the decision from the OPA server."""
return await _get_decision(self.opa_url, async_request, opa_input)
opa_instance = OPAAuthZ(
opa_url=oauth2lib_settings.OPA_URL,
)
graphql_opa_instance = GraphQLOPAAuthZ(
opa_url=oauth2lib_settings.OPA_URL,
)