-
Mohammad Torkashvand authoredMohammad Torkashvand authored
test_oidc.py 10.10 KiB
from http import HTTPStatus
from unittest.mock import AsyncMock, Mock
import pytest
from fastapi import HTTPException, Request
from httpx import AsyncClient, NetworkError, Response
from oauth2_lib.fastapi import OIDCConfig
from gso.auth.oidc import (
OIDCAuthentication,
OIDCUserModel,
_is_callback_step_endpoint,
)
from gso.auth.opa import _get_decision
from gso.auth.settings import oauth2lib_settings
@pytest.fixture(scope="module", autouse=True)
def _enable_oath2(_database, db_uri):
oauth2lib_settings.OAUTH2_ACTIVE = True
yield
oauth2lib_settings.OAUTH2_ACTIVE = False
@pytest.fixture()
def mock_openid_config():
return {
"issuer": "https://example.proxy.aai.geant.org",
"authorization_endpoint": "https://example.proxy.aai.geant.org/auth",
"token_endpoint": "https://example.proxy.aai.geant.org/token",
"userinfo_endpoint": "https://example.proxy.aai.geant.org/userinfo",
"introspect_endpoint": "https://example.proxy.aai.geant.org/introspect",
"jwks_uri": "https://example.proxy.aai.geant.org/jwks",
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
"grant_types_supported": ["authorization_code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid"],
"token_endpoint_auth_methods_supported": ["client_secret_basic"],
"claims_supported": ["sub", "name", "email"],
"claims_parameter_supported": True,
"request_parameter_supported": True,
"code_challenge_methods_supported": ["S256"],
}
@pytest.fixture()
def oidc_user(mock_openid_config):
user = OIDCAuthentication(
openid_url="https://example.proxy.aai.geant.org",
resource_server_id="resource_server",
resource_server_secret="secret", # noqa: S106
openid_config_url="https://example.proxy.aai.geant.org/.well-known/openid-configuration",
oidc_user_model_cls=OIDCUserModel,
)
user.openid_config = OIDCConfig.model_validate(mock_openid_config)
return user
@pytest.fixture()
def mock_request():
request = Mock(spec=Request)
request.method = "GET"
request.url.path = "/some/path"
request.json = AsyncMock(return_value={"key": "value"})
request.path_params = {}
request.query_params = {}
request.headers.get = Mock(return_value="Bearer testtoken1212121")
return request
@pytest.fixture()
def mock_oidc_user():
oidc_user = AsyncMock(
OIDCAuthentication,
openid_url="https://example.com",
resource_server_id="test",
resource_server_secret="secret", # noqa: S106
)
oidc_user.__call__ = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))
return oidc_user
@pytest.fixture()
def mock_async_client():
return AsyncClient(verify=False) # noqa: S501
@pytest.mark.asyncio()
async def test_introspect_token_success(oidc_user, mock_async_client):
mock_response_data = {"active": True, "sub": "123"}
mock_async_client.post = AsyncMock(return_value=Response(200, json=mock_response_data))
result = await oidc_user.introspect_token(mock_async_client, "test_token")
assert result == mock_response_data
@pytest.mark.asyncio()
async def test_introspect_token_json_decode_error(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(200, content=b"not a json"))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
@pytest.mark.asyncio()
async def test_introspect_token_http_error(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(400, json={"error": "invalid_request"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
@pytest.mark.asyncio()
async def test_introspect_token_unauthorized(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
assert "Invalid token" in str(exc_info.value.detail)
@pytest.mark.asyncio()
async def test_userinfo_success(oidc_user, mock_async_client):
mock_response_introspect_token = {
"active": True,
"scope": "openid profile email aarc",
"client_id": "APP-775F0BD8-B1D7-4936-BE2C-A300A6509F0B",
"exp": 1721395275,
"iat": 1721391675,
"sub": "ed145263-b652-3d4x-8f96-4abae9c98124@aai.geant.org",
"iss": "https://proxy.aai.geant.org",
"token_type": "Bearer",
"aud": ["APP-775F0BD8-B1D7-4936-BE2C-A300A6509F0B"],
}
mock_response_userinfo = {"sub": "1234", "name": "John Doe", "email": "johndoe@example.com"}
mock_async_client.post = AsyncMock(
side_effect=[
Response(200, json=mock_response_introspect_token),
Response(200, json=mock_response_userinfo),
]
)
response = await oidc_user.userinfo(mock_async_client, "test_token")
assert isinstance(response, OIDCUserModel)
assert response["sub"] == "1234"
assert response["name"] == "John Doe"
assert response["email"] == "johndoe@example.com"
@pytest.mark.asyncio()
async def test_userinfo_success_client_credential(oidc_user, mock_async_client):
mock_response_introspect_token = {
"active": True,
"scope": "openid profile email eduperson_assurance eduperson_entitlement entitlements",
"client_id": "APP-48A7986C-8776-49D9-AB40-52EFBD432AB1",
"exp": 1721395701,
"iat": 1721392101,
"iss": "https://proxy.aai.geant.org",
"token_type": "Bearer",
}
mock_async_client.post = AsyncMock(
return_value=Response(200, json=mock_response_introspect_token),
)
response = await oidc_user.userinfo(mock_async_client, "test_token")
assert isinstance(response, OIDCUserModel)
assert response == {"client_id": "APP-48A7986C-8776-49D9-AB40-52EFBD432AB1"}
@pytest.mark.asyncio()
async def test_userinfo_unauthorized(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(401, json={"detail": "Invalid token"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.userinfo(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
assert "Invalid token" in str(exc_info.value.detail)
@pytest.mark.asyncio()
async def test_userinfo_json_decode_error(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(200, text="not a json"))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.userinfo(mock_async_client, "test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
@pytest.mark.asyncio()
async def test_get_decision_success(mock_async_client):
mock_async_client.post = AsyncMock(
return_value=Response(200, json={"result": {"allow": True}, "decision_id": "123"})
)
opa_url = "http://mock-opa-url"
opa_input = {"some_input": "value"}
decision = await _get_decision(opa_url, mock_async_client, opa_input)
assert decision.result is True
assert decision.decision_id == "123"
@pytest.mark.asyncio()
async def test_get_decision_network_error(mock_async_client):
mock_async_client.post = AsyncMock(side_effect=NetworkError("Network error"))
opa_url = "http://mock-opa-url"
opa_input = {"some_input": "value"}
with pytest.raises(HTTPException) as exc_info:
await _get_decision(opa_url, mock_async_client, opa_input)
assert exc_info.value.status_code == HTTPStatus.SERVICE_UNAVAILABLE
assert exc_info.value.detail == "Policy agent is unavailable"
@pytest.mark.asyncio()
async def test_oidc_user_call_inactive_token(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(200, json={"active": False, "sub": "123"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.userinfo(mock_async_client, token="test_token")
assert exc_info.value.status_code == HTTPStatus.UNAUTHORIZED
assert "User is not active" in str(exc_info.value.detail)
@pytest.mark.parametrize(
("path", "expected"),
[
(
"/api/processes/daa171b3-7a76-4ac5-9528-11aefa5a6222/callback/9MS2tkFLl-TvWUHD2yhftfFSnPLR-koQolXBeG8OE-o",
True,
),
("/api/some/other/path", False),
],
)
def test_is_callback_step_endpoint(path, expected):
request = Request(
scope={
"type": "http",
"path": path,
"headers": [(b"host", b"example.com")],
}
)
assert _is_callback_step_endpoint(request) is expected
@pytest.mark.asyncio
async def test_userinfo_invalid_token(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(401, json={"error": "invalid_token"}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.userinfo(mock_async_client, "invalid_token")
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_introspect_token_missing_active_key(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(200, json={}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "token")
assert exc_info.value.status_code == 401
assert "Missing active key" in str(exc_info.value.detail)
@pytest.mark.asyncio
async def test_introspect_token_inactive(oidc_user, mock_async_client):
mock_async_client.post = AsyncMock(return_value=Response(200, json={"active": False}))
with pytest.raises(HTTPException) as exc_info:
await oidc_user.introspect_token(mock_async_client, "token")
assert exc_info.value.status_code == 401
assert "User is not active" in str(exc_info.value.detail)