Skip to content
Snippets Groups Projects
Commit 4939e5d2 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand Committed by Neda Moeini
Browse files

Add API key authentication for routes

parent 26fdf124
No related branches found
No related tags found
1 merge request!149Add API key authentication for routes
......@@ -8,13 +8,13 @@ from orchestrator.domain import SubscriptionModel
from orchestrator.schemas import SubscriptionDomainModelSchema
from orchestrator.services.subscriptions import build_extended_domain_model
from gso.auth.security import opa_security_default
from gso.auth.api_key_auth import get_api_key
from gso.services.subscriptions import get_active_router_subscriptions
router = APIRouter(
prefix="/subscriptions",
tags=["Subscriptions"],
dependencies=[Depends(opa_security_default)],
dependencies=[Depends(get_api_key)],
)
......
"""Manage API key validation for FastAPI routes."""
from fastapi import Depends, HTTPException, status
from fastapi.security.api_key import APIKeyHeader
from gso.settings import load_oss_params
API_KEY_NAME = "access_token"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=True)
async def get_api_key(api_key: str = Depends(api_key_header)) -> str:
"""Validate the provided API key against known third-party keys and returns it if valid, else raises HTTP 403."""
settings = load_oss_params()
# TODO: This is a simulated database of API keys which should be replace with a real one
if api_key in settings.THIRD_PARTY_API_KEYS:
return api_key
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid API Key")
......@@ -73,5 +73,9 @@
"broker_url": "redis://localhost:6379/0",
"result_backend": "rpc://localhost:6379/0",
"result_expires": 3600
},
"THIRD_PARTY_API_KEYS": {
"REALLY_random_AND_secure_T0keN": "Ansible Dynamic Inventory Generator",
"Another_REALLY_random_AND_secure_T0keN": "Application 2"
}
}
......@@ -161,6 +161,7 @@ class OSSParams(BaseSettings):
MONITORING: MonitoringParams
PROVISIONING_PROXY: ProvisioningProxyParams
CELERY: CeleryParams
THIRD_PARTY_API_KEYS: dict[str, str]
def load_oss_params() -> OSSParams:
......
......@@ -3,14 +3,28 @@ from orchestrator.types import SubscriptionLifecycle
ROUTER_SUBSCRIPTION_ENDPOINT = "/api/v1/subscriptions/routers"
def test_router_subscriptions_endpoint(test_client, nokia_router_subscription_factory):
def test_router_subscriptions_endpoint_with_valid_api_key(test_client, nokia_router_subscription_factory):
nokia_router_subscription_factory()
nokia_router_subscription_factory()
nokia_router_subscription_factory()
nokia_router_subscription_factory(status=SubscriptionLifecycle.TERMINATED)
nokia_router_subscription_factory(status=SubscriptionLifecycle.INITIAL)
response = test_client.get(ROUTER_SUBSCRIPTION_ENDPOINT)
response = test_client.get(ROUTER_SUBSCRIPTION_ENDPOINT, headers={"access_token": "REALY_random_AND_3cure_T0keN"})
assert response.status_code == 200
assert len(response.json()) == 3
def test_router_subscriptions_endpoint_with_invalid_api_key(test_client, nokia_router_subscription_factory):
response = test_client.get(ROUTER_SUBSCRIPTION_ENDPOINT, headers={"access_token": "fake_invalid_api_key"})
assert response.status_code == 403
assert response.json() == {"detail": "Invalid API Key"}
def test_router_subscriptions_endpoint_without_api_key(test_client, nokia_router_subscription_factory):
response = test_client.get(ROUTER_SUBSCRIPTION_ENDPOINT)
assert response.status_code == 403
assert response.json() == {"detail": "Not authenticated"}
......@@ -204,6 +204,10 @@ def configuration_data() -> dict:
"result_backend": "rpc://localhost:6379/0",
"result_expires": 3600,
},
"THIRD_PARTY_API_KEYS": {
"REALY_random_AND_3cure_T0keN": "LSO",
"another_REALY_random_AND_3cure_T0keN": "Application 2",
},
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment