from orchestrator.types import SubscriptionLifecycle from gso.utils.shared_enums import Vendor ROUTER_SUBSCRIPTION_ENDPOINT = "/api/v1/subscriptions/routers" DASHBOARD_DEVICES_ENDPOINT = "/api/v1/subscriptions/dashboard_devices" def test_router_subscriptions_endpoint_with_valid_api_key(test_client, router_subscription_factory): router_subscription_factory() router_subscription_factory() router_subscription_factory() router_subscription_factory(status=SubscriptionLifecycle.TERMINATED) router_subscription_factory(status=SubscriptionLifecycle.INITIAL) response = test_client.get( ROUTER_SUBSCRIPTION_ENDPOINT, headers={"Authorization": "Bearer another_REALY_random_AND_3cure_T0keN"} ) assert response.status_code == 200 assert len(response.json()) == 3 def test_router_subscriptions_endpoint_with_invalid_api_key(test_client, router_subscription_factory): response = test_client.get(ROUTER_SUBSCRIPTION_ENDPOINT, headers={"Authorization": "Bearer fake_invalid_api_key"}) assert response.status_code == 403 assert response.json() == {"detail": "Invalid API Key"} def test_router_subscriptions_endpoint_without_api_key(test_client, router_subscription_factory): response = test_client.get(ROUTER_SUBSCRIPTION_ENDPOINT) assert response.status_code == 403 assert response.json() == {"detail": "Not authenticated"} def test_dashboard_devices_endpoint_with_valid_api_key( test_client, router_subscription_factory, office_router_subscription_factory, super_pop_switch_subscription_factory, ): router_subscription_factory(router_fqdn="mx1.ams.nl.geant.net") router_subscription_factory(router_fqdn="mx2.ams.nl.geant.net") router_subscription_factory( vendor=Vendor.NOKIA, status=SubscriptionLifecycle.PROVISIONING, router_fqdn="mx3.ams.nl.geant.net" ) router_subscription_factory( vendor=Vendor.NOKIA, status=SubscriptionLifecycle.INITIAL, router_fqdn="mx4.ams.nl.geant.net" ) office_router_subscription_factory(office_router_fqdn="office1.ams.nl.geant.net") office_router_subscription_factory( office_router_fqdn="office2.ams.nl.geant.net", status=SubscriptionLifecycle.TERMINATED ) office_router_subscription_factory( office_router_fqdn="office3.ams.nl.geant.net", status=SubscriptionLifecycle.PROVISIONING ) super_pop_switch_subscription_factory(super_pop_switch_fqdn="superpop1.ams.nl.geant.net") super_pop_switch_subscription_factory( super_pop_switch_fqdn="superpop2.ams.nl.geant.net", status=SubscriptionLifecycle.TERMINATED ) super_pop_switch_subscription_factory( super_pop_switch_fqdn="superpop3.ams.nl.geant.net", status=SubscriptionLifecycle.PROVISIONING ) response = test_client.get( DASHBOARD_DEVICES_ENDPOINT, headers={"Authorization": "Bearer another_REALY_random_AND_3cure_T0keN"} ) assert response.status_code == 200 fqdns = response.text.strip().split("\n") assert sorted(fqdns) == [ "mx1.ams.nl.geant.net", "mx2.ams.nl.geant.net", "mx3.ams.nl.geant.net", "office1.ams.nl.geant.net", "office3.ams.nl.geant.net", "superpop1.ams.nl.geant.net", "superpop3.ams.nl.geant.net", ] def test_dashboard_devices_endpoint_with_invalid_api_key(test_client, router_subscription_factory): response = test_client.get(DASHBOARD_DEVICES_ENDPOINT, headers={"Authorization": "Bearer fake_invalid_api_key"}) assert response.status_code == 403 assert response.json() == {"detail": "Invalid API Key"} def test_dashboard_devices_endpoint_without_api_key(test_client, router_subscription_factory): response = test_client.get(DASHBOARD_DEVICES_ENDPOINT) assert response.status_code == 403 assert response.json() == {"detail": "Not authenticated"}