-
Mohammad Torkashvand authoredMohammad Torkashvand authored
test_oidc_policy_helper.py 9.93 KiB
from http import HTTPStatus
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from fastapi import HTTPException, Request
from httpx import AsyncClient, NetworkError, Response
from gso.auth.oidc_policy_helper import (
OIDCConfig,
OIDCUser,
OIDCUserModel,
OPAResult,
_evaluate_decision,
_get_decision,
opa_decision,
)
from gso.auth.settings import oauth2lib_settings
@pytest.fixture(scope="module", autouse=True)
def _enable_oath2(_database, db_uri):
oauth2lib_settings.OAUTH2_ACTIVE = True
yield
oauth2lib_settings.OAUTH2_ACTIVE = False
@pytest.fixture()
def mock_openid_config():
return {
"issuer": "https://example.proxy.aai.geant.org",
"authorization_endpoint": "https://example.proxy.aai.geant.org/auth",
"token_endpoint": "https://example.proxy.aai.geant.org/token",
"userinfo_endpoint": "https://example.proxy.aai.geant.org/userinfo",
"introspect_endpoint": "https://example.proxy.aai.geant.org/introspect",
"jwks_uri": "https://example.proxy.aai.geant.org/jwks",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid"],
"token_endpoint_auth_methods_supported": ["client_secret_basic"],
"claims_supported": ["sub", "name", "email"],
"claims_parameter_supported": True,
"request_parameter_supported": True,
"code_challenge_methods_supported": ["S256"],
}
@pytest.fixture()
def oidc_user(mock_openid_config):
user = OIDCUser(
openid_url="https://example.proxy.aai.geant.org",
resource_server_id="resource_server",
resource_server_secret="secret",
)
user.openid_config = OIDCConfig.parse_obj(mock_openid_config)
return user
# Fixture for the Request object
@pytest.fixture()
def mock_request():
request = Mock(spec=Request)
request.method = "GET"
request.url.path = "/some/path"
request.json = AsyncMock(return_value={"key": "value"}) # Mock JSON body
request.path_params = {}
request.query_params = {}
request.headers.get = Mock(return_value="Bearer testtoken1212121")
return request
@pytest.fixture()
def mock_oidc_user():
oidc_user = AsyncMock(
OIDCUser, openid_url="https://example.com", resource_server_id="test", resource_server_secret="secret"
)
oidc_user.__call__ = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))
return oidc_user
@pytest.fixture()
def mock_async_client():
return AsyncClient(verify=False) # noqa: S501
@pytest.mark.asyncio()
async def test_introspect_token_success(oidc_user, mock_async_client):
mock_response_data = {"active": True, "sub": "123"}
mock_async_client.post = AsyncMock(return_value=Response(200, json=mock_response_data))
result = await oidc_user.introspect_token(mock_async_client, "test_token")
assert result == mock_response_data
@pytest.mark.asyncio()
async def test_introspect_token_json_decode_error(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(200, content=b"not a json"))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
@pytest.mark.asyncio()
async def test_introspect_token_http_error(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(400, json={"error": "invalid_request"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
@pytest.mark.asyncio()
async def test_introspect_token_unauthorized(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
assert "Invalid token" in str(exc_info.value.detail)
@pytest.mark.asyncio()
async def test_userinfo_success(oidc_user, mock_async_client):
mock_response = {"sub": "1234", "name": "John Doe", "email": "johndoe@example.com"}
mock_async_client.post = AsyncMock(return_value=Response(200, json=mock_response))
response = await oidc_user.userinfo(mock_async_client, "test_token")
assert isinstance(response, OIDCUserModel)
assert response["sub"] == "1234"
assert response["name"] == "John Doe"
assert response["email"] == "johndoe@example.com"
@pytest.mark.asyncio()
async def test_opa_decision_success(mock_request, mock_async_client):
mock_user_info = OIDCUserModel({"sub": "123", "name": "John Doe", "email": "johndoe@example.com"})
mock_oidc_user = AsyncMock(spec=OIDCUser)
mock_oidc_user.return_value = AsyncMock(return_value=mock_user_info)
with patch(
"gso.auth.oidc_policy_helper._get_decision",
return_value=AsyncMock(return_value=OPAResult(result=True, decision_id="1234")),
):
decision_function = opa_decision("http://mock-opa-url", oidc_security=mock_oidc_user)
result = await decision_function(mock_request, mock_user_info, mock_async_client)
assert result is True
@pytest.mark.asyncio()
async def test_userinfo_unauthorized(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.userinfo(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
assert "Invalid token" in str(exc_info.value.detail)
@pytest.mark.asyncio()
async def test_userinfo_json_decode_error(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(200, text="not a json"))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.userinfo(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
@pytest.mark.asyncio()
async def test_get_decision_success(mock_async_client):
mock_async_client.post = AsyncMock(
return_value=Response(200, json={"result": {"allow": True}, "decision_id": "123"})
)
opa_url = "http://mock-opa-url"
opa_input = {"some_input": "value"}
decision = await _get_decision(mock_async_client, opa_url, opa_input)
assert decision.result is True
assert decision.decision_id == "123"
@pytest.mark.asyncio()
async def test_get_decision_network_error(mock_async_client):
mock_async_client.post = AsyncMock(side_effect=NetworkError("Network error"))
opa_url = "http://mock-opa-url"
opa_input = {"some_input": "value"}
with pytest.raises(HTTPException) as exc_info:
await _get_decision(mock_async_client, opa_url, opa_input)
assert exc_info.value.status_code == HTTPStatus.SERVICE_UNAVAILABLE
assert exc_info.value.detail == "Policy agent is unavailable"
def test_evaluate_decision_allow():
decision = OPAResult(result=True, decision_id="123")
result = _evaluate_decision(decision, auto_error=True)
assert result is True
def test_evaluate_decision_deny_without_auto_error():
decision = OPAResult(result=False, decision_id="123")
result = _evaluate_decision(decision, auto_error=False)
assert result is False
def test_evaluate_decision_deny_with_auto_error():
decision = OPAResult(result=False, decision_id="123")
with pytest.raises(HTTPException) as exc_info:
_evaluate_decision(decision, auto_error=True)
assert exc_info.value.status_code == HTTPStatus.FORBIDDEN
assert "Decision was taken with id: 123" in str(exc_info.value.detail)
@pytest.mark.asyncio()
async def test_oidc_user_call_with_token(oidc_user, mock_request, mock_async_client):
oidc_user.introspect_token = AsyncMock(return_value={"active": True})
oidc_user.userinfo = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))
result = await oidc_user.__call__(mock_request, token="test_token")
assert isinstance(result, OIDCUserModel)
assert result["sub"] == "123"
assert result["name"] == "John Doe"
@pytest.mark.asyncio()
async def test_oidc_user_call_inactive_token(oidc_user, mock_request, mock_async_client):
oidc_user.introspect_token = AsyncMock(return_value={"active": False})
with pytest.raises(HTTPException) as exc_info:
await oidc_user.__call__(mock_request, token="test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
assert "User is not active" in str(exc_info.value.detail)
@pytest.mark.asyncio()
async def test_oidc_user_call_no_token(oidc_user, mock_request):
with (
patch("fastapi.security.http.HTTPBearer.__call__", return_value=None),
patch("httpx.AsyncClient.post", new_callable=MagicMock) as mock_post,
patch("httpx.AsyncClient.get", new_callable=MagicMock) as mock_get,
):
mock_post.return_value = MagicMock(status_code=200, json=lambda: {"active": False})
mock_get.return_value = MagicMock(status_code=200, json=lambda: {})
result = await oidc_user.__call__(mock_request)
assert result is None
@pytest.mark.asyncio()
async def test_oidc_user_call_token_from_request(oidc_user, mock_request, mock_async_client):
mock_request.state.credentials = Mock()
mock_request.state.credentials.credentials = "request_token"
oidc_user.introspect_token = AsyncMock(return_value={"active": True})
oidc_user.userinfo = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))
result = await oidc_user.__call__(mock_request)
assert isinstance(result, OIDCUserModel)
assert result["sub"] == "123"
assert result["name"] == "John Doe"