from http import HTTPStatus from unittest.mock import AsyncMock, Mock import pytest from fastapi import HTTPException, Request from httpx import AsyncClient, NetworkError, Response from oauth2_lib.fastapi import OIDCConfig from oauth2_lib.settings import oauth2lib_settings from gso.auth.oidc import ( OIDCAuthentication, OIDCUserModel, _is_callback_step_endpoint, ) from gso.auth.opa import _get_decision @pytest.fixture(scope="module", autouse=True) def _enable_oath2(_database, db_uri): oauth2lib_settings.OAUTH2_ACTIVE = True yield oauth2lib_settings.OAUTH2_ACTIVE = False @pytest.fixture() def mock_openid_config(): return { "issuer": "https://example.proxy.aai.geant.org", "authorization_endpoint": "https://example.proxy.aai.geant.org/auth", "token_endpoint": "https://example.proxy.aai.geant.org/token", "userinfo_endpoint": "https://example.proxy.aai.geant.org/userinfo", "introspect_endpoint": "https://example.proxy.aai.geant.org/introspect", "jwks_uri": "https://example.proxy.aai.geant.org/jwks", "response_types_supported": ["code"], "response_modes_supported": ["query"], "grant_types_supported": ["authorization_code"], "subject_types_supported": ["public"], "id_token_signing_alg_values_supported": ["RS256"], "scopes_supported": ["openid"], "token_endpoint_auth_methods_supported": ["client_secret_basic"], "claims_supported": ["sub", "name", "email"], "claims_parameter_supported": True, "request_parameter_supported": True, "code_challenge_methods_supported": ["S256"], } @pytest.fixture() def oidc_user(mock_openid_config): user = OIDCAuthentication( openid_url="https://example.proxy.aai.geant.org", resource_server_id="resource_server", resource_server_secret="secret", # noqa: S106 openid_config_url="https://example.proxy.aai.geant.org/.well-known/openid-configuration", oidc_user_model_cls=OIDCUserModel, ) user.openid_config = OIDCConfig.model_validate(mock_openid_config) return user @pytest.fixture() def mock_request(): request = Mock(spec=Request) request.method = "GET" request.url.path = "/some/path" request.json = AsyncMock(return_value={"key": "value"}) request.path_params = {} request.query_params = {} request.headers.get = Mock(return_value="Bearer testtoken1212121") return request @pytest.fixture() def mock_oidc_user(): oidc_user = AsyncMock( OIDCAuthentication, openid_url="https://example.com", resource_server_id="test", resource_server_secret="secret", # noqa: S106 ) oidc_user.__call__ = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"})) return oidc_user @pytest.fixture() def mock_async_client(): return AsyncClient(verify=False) # noqa: S501 @pytest.mark.asyncio() async def test_introspect_token_success(oidc_user, mock_async_client): mock_response_data = {"active": True, "sub": "123"} mock_async_client.post = AsyncMock(return_value=Response(200, json=mock_response_data)) result = await oidc_user.introspect_token(mock_async_client, "test_token") assert result == mock_response_data @pytest.mark.asyncio() async def test_introspect_token_json_decode_error(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(200, content=b"not a json")) with pytest.raises(HTTPException) as exc_info: await oidc_user.introspect_token(mock_async_client, "test_token") assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED @pytest.mark.asyncio() async def test_introspect_token_http_error(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(400, json={"error": "invalid_request"})) with pytest.raises(HTTPException) as exc_info: await oidc_user.introspect_token(mock_async_client, "test_token") assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED @pytest.mark.asyncio() async def test_introspect_token_unauthorized(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"})) with pytest.raises(HTTPException) as exc_info: await oidc_user.introspect_token(mock_async_client, "test_token") assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED assert "Invalid token" in str(exc_info.value.detail) @pytest.mark.asyncio() async def test_userinfo_success(oidc_user, mock_async_client): mock_response_introspect_token = { "active": True, "scope": "openid profile email aarc", "client_id": "APP-775F0BD8-B1D7-4936-BE2C-A300A6509F00", "exp": 1721395275, "iat": 1721391675, "sub": "ed145263-b652-3d4x-8f96-4abae9c98124@aai.geant.org", "iss": "https://proxy.aai.geant.org", "token_type": "Bearer", "aud": ["APP-775F0BD8-B1D7-4936-BE2C-A300A6509F00"], } mock_response_userinfo = {"sub": "1234", "name": "John Doe", "email": "johndoe@example.com"} mock_async_client.post = AsyncMock( side_effect=[ Response(200, json=mock_response_introspect_token), Response(200, json=mock_response_userinfo), ] ) response = await oidc_user.userinfo(mock_async_client, "test_token") assert isinstance(response, OIDCUserModel) assert response["sub"] == "1234" assert response["name"] == "John Doe" assert response["email"] == "johndoe@example.com" @pytest.mark.asyncio() async def test_userinfo_success_client_credential(oidc_user, mock_async_client): mock_response_introspect_token = { "active": True, "scope": "openid profile email eduperson_assurance eduperson_entitlement entitlements", "client_id": "APP-48A7986C-8776-49D9-AB40-52EFBD432AB1", "exp": 1721395701, "iat": 1721392101, "iss": "https://proxy.aai.geant.org", "token_type": "Bearer", } mock_async_client.post = AsyncMock( return_value=Response(200, json=mock_response_introspect_token), ) response = await oidc_user.userinfo(mock_async_client, "test_token") assert isinstance(response, OIDCUserModel) assert response == {"client_id": "APP-48A7986C-8776-49D9-AB40-52EFBD432AB1"} @pytest.mark.asyncio() async def test_userinfo_unauthorized(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"})) with pytest.raises(HTTPException) as exc_info: await oidc_user.userinfo(mock_async_client, "test_token") assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED assert "Invalid token" in str(exc_info.value.detail) @pytest.mark.asyncio() async def test_userinfo_json_decode_error(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(200, text="not a json")) with pytest.raises(HTTPException) as exc_info: await oidc_user.userinfo(mock_async_client, "test_token") assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED @pytest.mark.asyncio() async def test_get_decision_success(mock_async_client): mock_async_client.post = AsyncMock( return_value=Response(200, json={"result": {"allow": True}, "decision_id": "123"}) ) opa_url = "http://mock-opa-url" opa_input = {"some_input": "value"} decision = await _get_decision(opa_url, mock_async_client, opa_input) assert decision.result is True assert decision.decision_id == "123" @pytest.mark.asyncio() async def test_get_decision_network_error(mock_async_client): mock_async_client.post = AsyncMock(side_effect=NetworkError("Network error")) opa_url = "http://mock-opa-url" opa_input = {"some_input": "value"} with pytest.raises(HTTPException) as exc_info: await _get_decision(opa_url, mock_async_client, opa_input) assert exc_info.value.status_code == HTTPStatus.SERVICE_UNAVAILABLE assert exc_info.value.detail == "Policy agent is unavailable" @pytest.mark.asyncio() async def test_oidc_user_call_inactive_token(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(200, json={"active": False, "sub": "123"})) with pytest.raises(HTTPException) as exc_info: await oidc_user.userinfo(mock_async_client, token="test_token") # noqa: S106 assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED assert "User is not active" in str(exc_info.value.detail) @pytest.mark.parametrize( ("path", "expected"), [ ( "/api/processes/daa171b3-7a76-4ac5-9528-11aefa5a6222/callback/9MS2tkFLl-TvWUHD2yhftfFSnPLR-koQolXBeG8OE-o", True, ), ("/api/some/other/path", False), ], ) def test_is_callback_step_endpoint(path, expected): request = Request( scope={ "type": "http", "path": path, "headers": [(b"host", b"example.com")], } ) assert _is_callback_step_endpoint(request) is expected @pytest.mark.asyncio() async def test_userinfo_invalid_token(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(401, json={"error": "invalid_token"})) with pytest.raises(HTTPException) as exc_info: await oidc_user.userinfo(mock_async_client, "invalid_token") assert exc_info.value.status_code == 401 @pytest.mark.asyncio() async def test_introspect_token_missing_active_key(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(200, json={})) with pytest.raises(HTTPException) as exc_info: await oidc_user.introspect_token(mock_async_client, "token") assert exc_info.value.status_code == 401 assert "Missing active key" in str(exc_info.value.detail) @pytest.mark.asyncio() async def test_introspect_token_inactive(oidc_user, mock_async_client): mock_async_client.post = AsyncMock(return_value=Response(200, json={"active": False})) with pytest.raises(HTTPException) as exc_info: await oidc_user.introspect_token(mock_async_client, "token") assert exc_info.value.status_code == 401 assert "User is not active" in str(exc_info.value.detail)