Skip to content
Snippets Groups Projects
Commit 7c15f413 authored by Hakan Calim's avatar Hakan Calim
Browse files

NAT-286: merged develop

parents 437ef774 e567da26
Branches
Tags
1 merge request!89Feature/nat 286 create unit tests for netbox client
Pipeline #84267 failed
from fastapi import APIRouter from fastapi import APIRouter
from gso.api.v1.imports import router as imports_router from gso.api.v1.imports import router as imports_router
from gso.api.v1.subscriptions import router as subscriptions_router
router = APIRouter() router = APIRouter()
router.include_router(imports_router) router.include_router(imports_router)
router.include_router(subscriptions_router)
...@@ -77,7 +77,8 @@ class IptrunkImportModel(BaseModel): ...@@ -77,7 +77,8 @@ class IptrunkImportModel(BaseModel):
@classmethod @classmethod
def _get_active_routers(cls) -> set[str]: def _get_active_routers(cls) -> set[str]:
return { return {
str(router_id) for router_id in subscriptions.get_active_router_subscriptions(fields=["subscription_id"]) str(router["subscription_id"])
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"])
} }
@validator("customer") @validator("customer")
......
from typing import Any
from fastapi import Depends, status
from fastapi.routing import APIRouter
from orchestrator.domain import SubscriptionModel
from orchestrator.schemas import SubscriptionDomainModelSchema
from orchestrator.security import opa_security_default
from orchestrator.services.subscriptions import build_extended_domain_model
from gso.services.subscriptions import get_active_router_subscriptions
router = APIRouter(prefix="/subscriptions", tags=["Subscriptions"], dependencies=[Depends(opa_security_default)])
@router.get("/routers", status_code=status.HTTP_200_OK, response_model=list[SubscriptionDomainModelSchema])
def subscription_routers() -> list[dict[str, Any]]:
"""Retrieve all active routers subscriptions."""
subscriptions = []
for r in get_active_router_subscriptions():
subscription = SubscriptionModel.from_subscription(r["subscription_id"])
extended_model = build_extended_domain_model(subscription)
subscriptions.append(extended_model)
return subscriptions
from typing import Any
from uuid import UUID from uuid import UUID
from asyncio_redis import Subscription
from orchestrator.db import ( from orchestrator.db import (
ProductTable, ProductTable,
ResourceTypeTable, ResourceTypeTable,
...@@ -8,57 +8,71 @@ from orchestrator.db import ( ...@@ -8,57 +8,71 @@ from orchestrator.db import (
SubscriptionInstanceValueTable, SubscriptionInstanceValueTable,
SubscriptionTable, SubscriptionTable,
) )
from orchestrator.graphql.schemas.subscription import Subscription
from orchestrator.types import SubscriptionLifecycle from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductType from gso.products import ProductType
SubscriptionType = dict[str, Any]
def get_active_subscriptions(product_type: str, fields: list[str]) -> list[Subscription]:
def get_active_subscriptions(
product_type: str,
includes: list[str] | None = None,
excludes: list[str] | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions for a specific product type. """Retrieve active subscriptions for a specific product type.
:param product_type: The type of the product for which to retrieve subscriptions. :param product_type: The type of the product for which to retrieve subscriptions.
:type product_type: str :type product_type: str
:param fields: List of fields to be included in the returned Subscription objects. :param includes: List of fields to be included in the returned Subscription objects.
:type fields: list[str] :type includes: list[str]
:param excludes: List of fields to be excluded from the returned Subscription objects.
:type excludes: list[str]
:return: A list of Subscription objects that match the query. :return: A list of Subscription objects that match the query.
:rtype: list[Subscription] :rtype: list[Subscription]
""" """
dynamic_fields = [getattr(SubscriptionTable, field) for field in fields] if not includes:
includes = [col.name for col in SubscriptionTable.__table__.columns]
return ( if excludes:
SubscriptionTable.query.join(ProductTable) includes = [field for field in includes if field not in excludes]
.filter(
ProductTable.product_type == product_type, dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
) query = SubscriptionTable.query.join(ProductTable).filter(
.with_entities(*dynamic_fields) ProductTable.product_type == product_type,
.all() SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
) )
results = query.with_entities(*dynamic_fields).all()
return [dict(zip(includes, result)) for result in results]
def get_active_site_subscriptions(fields: list[str]) -> list[Subscription]: def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for sites. """Retrieve active subscriptions specifically for sites.
:param fields: The fields to be included in the returned Subscription objects. :param includes: The fields to be included in the returned Subscription objects.
:type fields: list[str] :type includes: list[str]
:return: A list of Subscription objects for sites. :return: A list of Subscription objects for sites.
:rtype: list[Subscription] :rtype: list[Subscription]
""" """
return get_active_subscriptions(ProductType.SITE, fields) return get_active_subscriptions(product_type=ProductType.SITE, includes=includes)
def get_active_router_subscriptions(fields: list[str]) -> list[Subscription]: def get_active_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for routers. """Retrieve active subscriptions specifically for routers.
:param fields: The fields to be included in the returned Subscription objects. :param includes: The fields to be included in the returned Subscription objects.
:type fields: list[str] :type includes: list[str]
:return: A list of Subscription objects for routers. :return: A list of Subscription objects for routers.
:rtype: list[Subscription] :rtype: list[Subscription]
""" """
return get_active_subscriptions(product_type=ProductType.ROUTER, fields=fields) return get_active_subscriptions(product_type=ProductType.ROUTER, includes=includes)
def get_product_id_by_name(product_name: ProductType) -> UUID: def get_product_id_by_name(product_name: ProductType) -> UUID:
......
...@@ -33,10 +33,9 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: ...@@ -33,10 +33,9 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
# * interface names must be validated # * interface names must be validated
routers = {} routers = {}
for router_id, router_description in subscriptions.get_active_router_subscriptions(
fields=["subscription_id", "description"] for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"]):
): routers[str(router["subscription_id"])] = router["description"]
routers[str(router_id)] = router_description
class CreateIptrunkForm(FormPage): class CreateIptrunkForm(FormPage):
class Config: class Config:
......
...@@ -23,10 +23,8 @@ from gso.utils.helpers import iso_from_ipv4 ...@@ -23,10 +23,8 @@ from gso.utils.helpers import iso_from_ipv4
def _site_selector() -> Choice: def _site_selector() -> Choice:
site_subscriptions = {} site_subscriptions = {}
for site_id, site_description in subscriptions.get_active_site_subscriptions( for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"]):
fields=["subscription_id", "description"] site_subscriptions[str(site["subscription_id"])] = site["description"]
):
site_subscriptions[str(site_id)] = site_description
# noinspection PyTypeChecker # noinspection PyTypeChecker
return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items())) # type: ignore[arg-type] return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items())) # type: ignore[arg-type]
......
...@@ -20,10 +20,9 @@ from gso.workflows.iptrunk.create_iptrunk import initialize_subscription ...@@ -20,10 +20,9 @@ from gso.workflows.iptrunk.create_iptrunk import initialize_subscription
def _generate_routers() -> dict[str, str]: def _generate_routers() -> dict[str, str]:
"""Generate a dictionary of router IDs and descriptions.""" """Generate a dictionary of router IDs and descriptions."""
routers = {} routers = {}
for router_id, router_description in subscriptions.get_active_router_subscriptions( for subscription in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"]):
fields=["subscription_id", "description"] routers[str(subscription["subscription_id"])] = subscription["description"]
):
routers[str(router_id)] = router_description
return routers return routers
......
...@@ -86,6 +86,7 @@ def router_subscription_factory(site_subscription_factory, faker): ...@@ -86,6 +86,7 @@ def router_subscription_factory(site_subscription_factory, faker):
router_role=RouterRole.PE, router_role=RouterRole.PE,
router_site=None, router_site=None,
router_is_ias_connected=True, router_is_ias_connected=True,
status: SubscriptionLifecycle | None = None,
) -> UUIDstr: ) -> UUIDstr:
description = description or faker.text(max_nb_chars=30) description = description or faker.text(max_nb_chars=30)
router_fqdn = router_fqdn or faker.domain_name(levels=4) router_fqdn = router_fqdn or faker.domain_name(levels=4)
...@@ -118,6 +119,10 @@ def router_subscription_factory(site_subscription_factory, faker): ...@@ -118,6 +119,10 @@ def router_subscription_factory(site_subscription_factory, faker):
router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE) router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
router_subscription.description = description router_subscription.description = description
router_subscription.start_date = start_date router_subscription.start_date = start_date
if status:
router_subscription.status = status
router_subscription.save() router_subscription.save()
db.session.commit() db.session.commit()
......
...@@ -48,13 +48,23 @@ def mock_routers(iptrunk_data): ...@@ -48,13 +48,23 @@ def mock_routers(iptrunk_data):
with patch("gso.services.subscriptions.get_active_router_subscriptions") as mock_get_active_router_subscriptions: with patch("gso.services.subscriptions.get_active_router_subscriptions") as mock_get_active_router_subscriptions:
def _active_router_subscriptions(*args, **kwargs): def _active_router_subscriptions(*args, **kwargs):
if kwargs["fields"] == ["subscription_id", "description"]: if kwargs["includes"] == ["subscription_id", "description"]:
return [ return [
(iptrunk_data["side_a_node_id"], "side_a_node_id description"), {
(iptrunk_data["side_b_node_id"], "side_b_node_id description"), "subscription_id": iptrunk_data["side_a_node_id"],
(str(uuid4()), "random description"), "description": "iptrunk_sideA_node_id description",
},
{
"subscription_id": iptrunk_data["side_b_node_id"],
"description": "iptrunk_sideB_node_id description",
},
{"subscription_id": str(uuid4()), "description": "random description"},
] ]
return [iptrunk_data["side_a_node_id"], iptrunk_data["side_b_node_id"], str(uuid4())] return [
{"subscription_id": iptrunk_data["side_a_node_id"]},
{"subscription_id": iptrunk_data["side_b_node_id"]},
{"subscription_id": str(uuid4())},
]
mock_get_active_router_subscriptions.side_effect = _active_router_subscriptions mock_get_active_router_subscriptions.side_effect = _active_router_subscriptions
yield mock_get_active_router_subscriptions yield mock_get_active_router_subscriptions
...@@ -200,6 +210,9 @@ def test_import_iptrunk_invalid_customer(mock_start_process, test_client, mock_r ...@@ -200,6 +210,9 @@ def test_import_iptrunk_invalid_customer(mock_start_process, test_client, mock_r
@patch("gso.api.v1.imports._start_process") @patch("gso.api.v1.imports._start_process")
def test_import_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data): def test_import_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data):
iptrunk_data["side_a_node_id"] = "NOT FOUND"
iptrunk_data["side_b_node_id"] = "NOT FOUND"
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
......
from test.fixtures import router_subscription_factory, site_subscription_factory # noqa
from orchestrator.types import SubscriptionLifecycle
ROUTER_SUBSCRIPTION_ENDPOINT = "/api/v1/subscriptions/routers"
def test_router_subscriptions_endpoint(test_client, router_subscription_factory):
router_subscription_factory()
router_subscription_factory()
router_subscription_factory()
router_subscription_factory(status=SubscriptionLifecycle.TERMINATED)
router_subscription_factory(status=SubscriptionLifecycle.INITIAL)
response = test_client.get(ROUTER_SUBSCRIPTION_ENDPOINT)
assert response.status_code == 200
assert len(response.json()) == 3
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment