Skip to content
Snippets Groups Projects
test_oidc_policy_helper.py 9.93 KiB
from http import HTTPStatus
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import pytest
from fastapi import HTTPException, Request
from httpx import AsyncClient, NetworkError, Response

from gso.auth.oidc_policy_helper import (
    OIDCConfig,
    OIDCUser,
    OIDCUserModel,
    OPAResult,
    _evaluate_decision,
    _get_decision,
    opa_decision,
)
from gso.auth.settings import oauth2lib_settings


@pytest.fixture(scope="module", autouse=True)
def _enable_oath2(_database, db_uri):
    oauth2lib_settings.OAUTH2_ACTIVE = True

    yield

    oauth2lib_settings.OAUTH2_ACTIVE = False


@pytest.fixture()
def mock_openid_config():
    return {
        "issuer": "https://example.proxy.aai.geant.org",
        "authorization_endpoint": "https://example.proxy.aai.geant.org/auth",
        "token_endpoint": "https://example.proxy.aai.geant.org/token",
        "userinfo_endpoint": "https://example.proxy.aai.geant.org/userinfo",
        "introspect_endpoint": "https://example.proxy.aai.geant.org/introspect",
        "jwks_uri": "https://example.proxy.aai.geant.org/jwks",
        "response_types_supported": ["code"],
        "response_modes_supported": ["query"],
        "grant_types_supported": ["authorization_code"],
        "subject_types_supported": ["public"],
        "id_token_signing_alg_values_supported": ["RS256"],
        "scopes_supported": ["openid"],
        "token_endpoint_auth_methods_supported": ["client_secret_basic"],
        "claims_supported": ["sub", "name", "email"],
        "claims_parameter_supported": True,
        "request_parameter_supported": True,
        "code_challenge_methods_supported": ["S256"],
    }


@pytest.fixture()
def oidc_user(mock_openid_config):
    user = OIDCUser(
        openid_url="https://example.proxy.aai.geant.org",
        resource_server_id="resource_server",
        resource_server_secret="secret",
    )
    user.openid_config = OIDCConfig.parse_obj(mock_openid_config)
    return user


# Fixture for the Request object


@pytest.fixture()
def mock_request():
    request = Mock(spec=Request)
    request.method = "GET"
    request.url.path = "/some/path"
    request.json = AsyncMock(return_value={"key": "value"})  # Mock JSON body
    request.path_params = {}
    request.query_params = {}
    request.headers.get = Mock(return_value="Bearer testtoken1212121")
    return request


@pytest.fixture()
def mock_oidc_user():
    oidc_user = AsyncMock(
        OIDCUser, openid_url="https://example.com", resource_server_id="test", resource_server_secret="secret"
    )
    oidc_user.__call__ = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))
    return oidc_user


@pytest.fixture()
def mock_async_client():
    return AsyncClient(verify=False)  # noqa: S501


@pytest.mark.asyncio()
async def test_introspect_token_success(oidc_user, mock_async_client):
    mock_response_data = {"active": True, "sub": "123"}
    mock_async_client.post = AsyncMock(return_value=Response(200, json=mock_response_data))

    result = await oidc_user.introspect_token(mock_async_client, "test_token")

    assert result == mock_response_data


@pytest.mark.asyncio()
async def test_introspect_token_json_decode_error(oidc_user, mock_async_client):
    mock_async_client.post = AsyncMock(return_value=Response(200, content=b"not a json"))

    with pytest.raises(HTTPException) as exc_info:
        await oidc_user.introspect_token(mock_async_client, "test_token")

    assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED


@pytest.mark.asyncio()
async def test_introspect_token_http_error(oidc_user, mock_async_client):
    mock_async_client.post = AsyncMock(return_value=Response(400, json={"error": "invalid_request"}))

    with pytest.raises(HTTPException) as exc_info:
        await oidc_user.introspect_token(mock_async_client, "test_token")

    assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED


@pytest.mark.asyncio()
async def test_introspect_token_unauthorized(oidc_user, mock_async_client):
    mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"}))

    with pytest.raises(HTTPException) as exc_info:
        await oidc_user.introspect_token(mock_async_client, "test_token")

    assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
    assert "Invalid token" in str(exc_info.value.detail)


@pytest.mark.asyncio()
async def test_userinfo_success(oidc_user, mock_async_client):
    mock_response = {"sub": "1234", "name": "John Doe", "email": "johndoe@example.com"}
    mock_async_client.post = AsyncMock(return_value=Response(200, json=mock_response))

    response = await oidc_user.userinfo(mock_async_client, "test_token")

    assert isinstance(response, OIDCUserModel)
    assert response["sub"] == "1234"
    assert response["name"] == "John Doe"
    assert response["email"] == "johndoe@example.com"


@pytest.mark.asyncio()
async def test_opa_decision_success(mock_request, mock_async_client):
    mock_user_info = OIDCUserModel({"sub": "123", "name": "John Doe", "email": "johndoe@example.com"})

    mock_oidc_user = AsyncMock(spec=OIDCUser)
    mock_oidc_user.return_value = AsyncMock(return_value=mock_user_info)

    with patch(
        "gso.auth.oidc_policy_helper._get_decision",
        return_value=AsyncMock(return_value=OPAResult(result=True, decision_id="1234")),
    ):
        decision_function = opa_decision("http://mock-opa-url", oidc_security=mock_oidc_user)

        result = await decision_function(mock_request, mock_user_info, mock_async_client)

        assert result is True


@pytest.mark.asyncio()
async def test_userinfo_unauthorized(oidc_user, mock_async_client):
    mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"}))

    with pytest.raises(HTTPException) as exc_info:
        await oidc_user.userinfo(mock_async_client, "test_token")

    assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
    assert "Invalid token" in str(exc_info.value.detail)


@pytest.mark.asyncio()
async def test_userinfo_json_decode_error(oidc_user, mock_async_client):
    mock_async_client.post = AsyncMock(return_value=Response(200, text="not a json"))

    with pytest.raises(HTTPException) as exc_info:
        await oidc_user.userinfo(mock_async_client, "test_token")

    assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED


@pytest.mark.asyncio()
async def test_get_decision_success(mock_async_client):
    mock_async_client.post = AsyncMock(
        return_value=Response(200, json={"result": {"allow": True}, "decision_id": "123"})
    )

    opa_url = "http://mock-opa-url"
    opa_input = {"some_input": "value"}
    decision = await _get_decision(mock_async_client, opa_url, opa_input)

    assert decision.result is True
    assert decision.decision_id == "123"


@pytest.mark.asyncio()
async def test_get_decision_network_error(mock_async_client):
    mock_async_client.post = AsyncMock(side_effect=NetworkError("Network error"))

    opa_url = "http://mock-opa-url"
    opa_input = {"some_input": "value"}

    with pytest.raises(HTTPException) as exc_info:
        await _get_decision(mock_async_client, opa_url, opa_input)

    assert exc_info.value.status_code == HTTPStatus.SERVICE_UNAVAILABLE
    assert exc_info.value.detail == "Policy agent is unavailable"


def test_evaluate_decision_allow():
    decision = OPAResult(result=True, decision_id="123")
    result = _evaluate_decision(decision, auto_error=True)

    assert result is True


def test_evaluate_decision_deny_without_auto_error():
    decision = OPAResult(result=False, decision_id="123")
    result = _evaluate_decision(decision, auto_error=False)

    assert result is False


def test_evaluate_decision_deny_with_auto_error():
    decision = OPAResult(result=False, decision_id="123")

    with pytest.raises(HTTPException) as exc_info:
        _evaluate_decision(decision, auto_error=True)

    assert exc_info.value.status_code == HTTPStatus.FORBIDDEN
    assert "Decision was taken with id: 123" in str(exc_info.value.detail)


@pytest.mark.asyncio()
async def test_oidc_user_call_with_token(oidc_user, mock_request, mock_async_client):
    oidc_user.introspect_token = AsyncMock(return_value={"active": True})
    oidc_user.userinfo = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))

    result = await oidc_user.__call__(mock_request, token="test_token")

    assert isinstance(result, OIDCUserModel)
    assert result["sub"] == "123"
    assert result["name"] == "John Doe"


@pytest.mark.asyncio()
async def test_oidc_user_call_inactive_token(oidc_user, mock_request, mock_async_client):
    oidc_user.introspect_token = AsyncMock(return_value={"active": False})

    with pytest.raises(HTTPException) as exc_info:
        await oidc_user.__call__(mock_request, token="test_token")

    assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
    assert "User is not active" in str(exc_info.value.detail)


@pytest.mark.asyncio()
async def test_oidc_user_call_no_token(oidc_user, mock_request):
    with (
        patch("fastapi.security.http.HTTPBearer.__call__", return_value=None),
        patch("httpx.AsyncClient.post", new_callable=MagicMock) as mock_post,
        patch("httpx.AsyncClient.get", new_callable=MagicMock) as mock_get,
    ):
        mock_post.return_value = MagicMock(status_code=200, json=lambda: {"active": False})
        mock_get.return_value = MagicMock(status_code=200, json=lambda: {})

        result = await oidc_user.__call__(mock_request)

    assert result is None


@pytest.mark.asyncio()
async def test_oidc_user_call_token_from_request(oidc_user, mock_request, mock_async_client):
    mock_request.state.credentials = Mock()
    mock_request.state.credentials.credentials = "request_token"

    oidc_user.introspect_token = AsyncMock(return_value={"active": True})
    oidc_user.userinfo = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))

    result = await oidc_user.__call__(mock_request)

    assert isinstance(result, OIDCUserModel)
    assert result["sub"] == "123"
    assert result["name"] == "John Doe"