Skip to content
Snippets Groups Projects
Verified Commit 10940d16 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Remove API endpoints for importing subscriptions, and move all functionality into the CLI

parent 61743c33
No related branches found
No related tags found
1 merge request!201Add imported products
"""Helper methods for the :term:`API` module."""
from uuid import UUID
from fastapi import HTTPException
from orchestrator.services import processes
from pydantic import BaseModel
from starlette import status
def _start_process(process_name: str, data: dict) -> UUID:
"""Start a process and handle common exceptions."""
pid: UUID = processes.start_process(process_name, [data])
if pid is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to start the process.",
)
process = processes._get_process(pid) # noqa: SLF001
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
class ImportResponseModel(BaseModel):
"""The model of a response given when services are imported using the :term:`API`."""
pid: UUID
detail: str
...@@ -2,14 +2,12 @@ ...@@ -2,14 +2,12 @@
from fastapi import APIRouter from fastapi import APIRouter
from gso.api.v1.imports import api_router as imports_router
from gso.api.v1.network import router as network_router from gso.api.v1.network import router as network_router
from gso.api.v1.processes import router as processes_router from gso.api.v1.processes import router as processes_router
from gso.api.v1.subscriptions import router as subscriptions_router from gso.api.v1.subscriptions import router as subscriptions_router
router = APIRouter() router = APIRouter()
router.include_router(imports_router)
router.include_router(subscriptions_router) router.include_router(subscriptions_router)
router.include_router(processes_router) router.include_router(processes_router)
router.include_router(network_router) router.include_router(network_router)
""":term:`API` routes for adding existing products to the database."""
from fastapi import Depends
from fastapi.routing import APIRouter
from gso.api.v1.imports.iptrunk import router as iptrunk_router
from gso.api.v1.imports.office_router import router as office_router_router
from gso.api.v1.imports.router import router as router_router
from gso.api.v1.imports.site import router as site_router
from gso.api.v1.imports.super_pop_switch import router as super_pop_switch_router
from gso.auth.security import opa_security_default
api_router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
api_router.include_router(iptrunk_router)
api_router.include_router(office_router_router)
api_router.include_router(router_router)
api_router.include_router(site_router)
api_router.include_router(super_pop_switch_router)
""":term:`API` endpoints for importing IP trunks."""
import ipaddress
from typing import Any
from fastapi import APIRouter, status
from pydantic import BaseModel, root_validator, validator
from gso.api.helpers import ImportResponseModel, _start_process
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.services import subscriptions
from gso.services.partners import PartnerNotFoundError, get_partner_by_name
from gso.utils.helpers import LAGMember
router = APIRouter()
class IptrunkImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.products.product_types.iptrunk`."""
partner: str
geant_s_sid: str | None
iptrunk_type: IptrunkType
iptrunk_description: str
iptrunk_speed: PhysicalPortCapacity
iptrunk_minimum_links: int
iptrunk_isis_metric: int
side_a_node_id: str
side_a_ae_iface: str
side_a_ae_geant_a_sid: str | None
side_a_ae_members: list[LAGMember]
side_b_node_id: str
side_b_ae_iface: str
side_b_ae_geant_a_sid: str | None
side_b_ae_members: list[LAGMember]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
@classmethod
def _get_active_routers(cls) -> set[str]:
return {
str(router["subscription_id"])
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"])
}
@validator("partner")
def check_if_partner_exists(cls, value: str) -> str:
"""Validate that the partner exists."""
try:
get_partner_by_name(value)
except PartnerNotFoundError as e:
msg = f"partner {value} not found"
raise ValueError(msg) from e
return value
@validator("side_a_node_id", "side_b_node_id")
def check_if_router_side_is_available(cls, value: str) -> str:
"""Both sides of the trunk must exist in :term:`GSO`."""
if value not in cls._get_active_routers():
msg = f"Router {value} not found"
raise ValueError(msg)
return value
@validator("side_a_ae_members", "side_b_ae_members")
def check_side_uniqueness(cls, value: list[str]) -> list[str]:
""":term:`LAG` members must be unique."""
if len(value) != len(set(value)):
msg = "Items must be unique"
raise ValueError(msg)
return value
@root_validator
def check_members(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Amount of :term:`LAG` members has to match on side A and B, and meet the minimum requirement."""
min_links = values["iptrunk_minimum_links"]
side_a_members = values.get("side_a_ae_members", [])
side_b_members = values.get("side_b_ae_members", [])
len_a = len(side_a_members)
len_b = len(side_b_members)
if len_a < min_links:
msg = f"Side A members should be at least {min_links} (iptrunk_minimum_links)"
raise ValueError(msg)
if len_a != len_b:
msg = "Mismatch between Side A and B members"
raise ValueError(msg)
return values
@router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]:
"""Import an iptrunk by running the create_imported_iptrunk workflow.
:param iptrunk_data: The iptrunk information to be imported.
:type iptrunk_data: IptrunkImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_iptrunk", iptrunk_data.dict())
return {"detail": "Iptrunk has been added successfully", "pid": pid}
""":term:`API` endpoints for importing office Routers."""
import ipaddress
from typing import Any
from fastapi import APIRouter, status
from pydantic import BaseModel
from gso.api.helpers import ImportResponseModel, _start_process
from gso.utils.shared_enums import PortNumber
router = APIRouter()
class OfficeRouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.office_router`."""
partner: str
office_router_site: str
office_router_fqdn: str
office_router_ts_port: PortNumber
office_router_lo_ipv4_address: ipaddress.IPv4Address
office_router_lo_ipv6_address: ipaddress.IPv6Address
@router.post("/office-routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_office_router(office_router_data: OfficeRouterImportModel) -> dict[str, Any]:
"""Import an office router by running the create_imported_office_router workflow.
:param office_router_data: The office router information to be imported.
:type office_router_data: OfficeRouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_office_router", office_router_data.dict())
return {"detail": "Office router has been added successfully", "pid": pid}
""":term:`API` endpoints for importing Routers."""
import ipaddress
from typing import Any
from fastapi import APIRouter, status
from pydantic import BaseModel
from gso.api.helpers import ImportResponseModel, _start_process
from gso.products.product_blocks.router import RouterRole
from gso.utils.shared_enums import Vendor
router = APIRouter()
class RouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.router`."""
partner: str
router_site: str
hostname: str
ts_port: int
router_vendor: Vendor
router_role: RouterRole
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
@router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_router(router_data: RouterImportModel) -> dict[str, Any]:
"""Import a router by running the create_imported_router workflow.
:param router_data: The router information to be imported.
:type router_data: RouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_router", router_data.dict())
return {"detail": "Router has been added successfully", "pid": pid}
""":term:`API` endpoints for importing Sites."""
from typing import Any
from fastapi import APIRouter, status
from gso.api.helpers import ImportResponseModel, _start_process
from gso.products.product_blocks.site import SiteTier
from gso.utils.helpers import BaseSiteValidatorModel
router = APIRouter()
class SiteImportModel(BaseSiteValidatorModel):
"""The required input for importing an existing :class:`gso.products.product_types.site`."""
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
partner: str
@router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_site(site: SiteImportModel) -> dict[str, Any]:
"""Import a site by running the create_imported_site workflow.
:param site: The site information to be imported.
:type site: SiteImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If the site already exists or if there's an error in the process.
"""
pid = _start_process("create_imported_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid}
""":term:`API` endpoints for importing Super PoP switches."""
import ipaddress
from typing import Any
from fastapi import APIRouter, status
from pydantic import BaseModel
from gso.api.helpers import ImportResponseModel, _start_process
from gso.utils.shared_enums import PortNumber
router = APIRouter()
class SuperPopSwitchImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.super_pop_switch`."""
partner: str
super_pop_switch_site: str
hostname: str
super_pop_switch_ts_port: PortNumber
super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address
@router.post("/super-pop-switches", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_super_pop_switch(super_pop_switch_data: SuperPopSwitchImportModel) -> dict[str, Any]:
"""Import a Super PoP switch by running the create_imported_super_pop_switch workflow.
:param super_pop_switch_data: The Super PoP switch information to be imported.
:type super_pop_switch_data: SuperPopSwitchImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_super_pop_switch", super_pop_switch_data.dict())
return {"detail": "Super PoP switch has been added successfully", "pid": pid}
""":term:`CLI` command for importing data to coreDB.""" """:term:`CLI` commands for importing data to coreDB."""
import csv import csv
import ipaddress import ipaddress
import json import json
from datetime import UTC, datetime from datetime import UTC, datetime
from pathlib import Path from pathlib import Path
from typing import TypeVar from typing import Any, TypeVar
import typer import typer
import yaml import yaml
from orchestrator.db import db from orchestrator.db import db
from orchestrator.services.processes import start_process from orchestrator.services.processes import start_process
from orchestrator.types import SubscriptionLifecycle from orchestrator.types import SubscriptionLifecycle
from pydantic import ValidationError from pydantic import BaseModel, ValidationError, root_validator, validator
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from gso.api.v1.imports.iptrunk import IptrunkImportModel, create_imported_iptrunk
from gso.api.v1.imports.office_router import OfficeRouterImportModel, create_imported_office_router
from gso.api.v1.imports.router import RouterImportModel, create_imported_router
from gso.api.v1.imports.site import SiteImportModel, create_imported_site
from gso.api.v1.imports.super_pop_switch import SuperPopSwitchImportModel, create_imported_super_pop_switch
from gso.db.models import PartnerTable from gso.db.models import PartnerTable
from gso.products import ProductType from gso.products import ProductType
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value, get_subscriptions from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.services.partners import PartnerNotFoundError, get_partner_by_name
from gso.services.subscriptions import (
get_active_router_subscriptions,
get_active_subscriptions_by_field_and_value,
get_subscriptions,
)
from gso.utils.helpers import BaseSiteValidatorModel, LAGMember
from gso.utils.shared_enums import PortNumber, Vendor
app: typer.Typer = typer.Typer() app: typer.Typer = typer.Typer()
class SiteImportModel(BaseSiteValidatorModel):
"""The required input for importing an existing :class:`gso.products.product_types.site`."""
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
partner: str
class RouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.router`."""
partner: str
router_site: str
hostname: str
ts_port: int
router_vendor: Vendor
router_role: RouterRole
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
class SuperPopSwitchImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.super_pop_switch`."""
partner: str
super_pop_switch_site: str
hostname: str
super_pop_switch_ts_port: PortNumber
super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address
class OfficeRouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.office_router`."""
partner: str
office_router_site: str
office_router_fqdn: str
office_router_ts_port: PortNumber
office_router_lo_ipv4_address: ipaddress.IPv4Address
office_router_lo_ipv6_address: ipaddress.IPv6Address
class IptrunkImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.products.product_types.iptrunk`."""
partner: str
geant_s_sid: str | None
iptrunk_type: IptrunkType
iptrunk_description: str
iptrunk_speed: PhysicalPortCapacity
iptrunk_minimum_links: int
iptrunk_isis_metric: int
side_a_node_id: str
side_a_ae_iface: str
side_a_ae_geant_a_sid: str | None
side_a_ae_members: list[LAGMember]
side_b_node_id: str
side_b_ae_iface: str
side_b_ae_geant_a_sid: str | None
side_b_ae_members: list[LAGMember]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
@classmethod
def _get_active_routers(cls) -> set[str]:
return {
str(router["subscription_id"]) for router in get_active_router_subscriptions(includes=["subscription_id"])
}
@validator("partner")
def check_if_partner_exists(cls, value: str) -> str:
"""Validate that the partner exists."""
try:
get_partner_by_name(value)
except PartnerNotFoundError as e:
msg = f"partner {value} not found"
raise ValueError(msg) from e
return value
@validator("side_a_node_id", "side_b_node_id")
def check_if_router_side_is_available(cls, value: str) -> str:
"""Both sides of the trunk must exist in :term:`GSO`."""
if value not in cls._get_active_routers():
msg = f"Router {value} not found"
raise ValueError(msg)
return value
@validator("side_a_ae_members", "side_b_ae_members")
def check_side_uniqueness(cls, value: list[str]) -> list[str]:
""":term:`LAG` members must be unique."""
if len(value) != len(set(value)):
msg = "Items must be unique"
raise ValueError(msg)
return value
@root_validator
def check_members(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Amount of :term:`LAG` members has to match on side A and B, and meet the minimum requirement."""
min_links = values["iptrunk_minimum_links"]
side_a_members = values.get("side_a_ae_members", [])
side_b_members = values.get("side_b_ae_members", [])
len_a = len(side_a_members)
len_b = len(side_b_members)
if len_a < min_links:
msg = f"Side A members should be at least {min_links} (iptrunk_minimum_links)"
raise ValueError(msg)
if len_a != len_b:
msg = "Mismatch between Side A and B members"
raise ValueError(msg)
return values
T = TypeVar( T = TypeVar(
"T", SiteImportModel, RouterImportModel, IptrunkImportModel, SuperPopSwitchImportModel, OfficeRouterImportModel "T", SiteImportModel, RouterImportModel, IptrunkImportModel, SuperPopSwitchImportModel, OfficeRouterImportModel
) )
...@@ -36,10 +171,9 @@ common_filepath_option = typer.Option( ...@@ -36,10 +171,9 @@ common_filepath_option = typer.Option(
) )
def _read_data(filepath: str) -> dict: def _read_data(file_path: Path) -> dict:
"""Read data from a JSON or YAML file.""" """Read data from a JSON or YAML file."""
typer.echo(f"Starting import from {filepath}") typer.echo(f"Starting import from {file_path!s}")
file_path = Path(filepath)
file_extension = file_path.suffix.lower() file_extension = file_path.suffix.lower()
with file_path.open("r") as f: with file_path.open("r") as f:
...@@ -54,34 +188,6 @@ def _read_data(filepath: str) -> dict: ...@@ -54,34 +188,6 @@ def _read_data(filepath: str) -> dict:
raise typer.Exit(code=1) raise typer.Exit(code=1)
def _generic_import_data(
filepath: str,
import_model: type[T],
import_function: callable, # type: ignore[valid-type]
name_key: str,
) -> None:
"""Import data from a JSON or YAML file."""
successfully_imported_data = []
data = _read_data(filepath)
for details in data:
details["partner"] = "GEANT"
typer.echo(f"Importing {name_key}: {details[name_key]}")
try:
initial_data = import_model(**details)
import_function(initial_data) # type: ignore[misc]
successfully_imported_data.append(getattr(initial_data, name_key))
typer.echo(
f"Successfully imported {name_key}: {getattr(initial_data, name_key)}",
)
except ValidationError as e:
typer.echo(f"Validation error: {e}")
if successfully_imported_data:
typer.echo(f"Successfully imported {name_key}s:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
def _get_router_subscription_id(node_name: str) -> str | None: def _get_router_subscription_id(node_name: str) -> str | None:
"""Get the subscription id for a router by its node name.""" """Get the subscription id for a router by its node name."""
subscriptions = get_active_subscriptions_by_field_and_value( subscriptions = get_active_subscriptions_by_field_and_value(
...@@ -100,63 +206,81 @@ def _import_partners_from_csv(file_path: Path) -> list[dict]: ...@@ -100,63 +206,81 @@ def _import_partners_from_csv(file_path: Path) -> list[dict]:
return list(csv_reader) return list(csv_reader)
def _generic_import_product(
file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T]
) -> None:
"""Import subscriptions from a JSON or YAML file."""
successfully_imported_data = []
data = _read_data(file_path)
for details in data:
details["partner"] = "GEANT"
typer.echo(f"Creating imported {name_key}: {details[name_key]}")
try:
initial_data = import_model(**details)
start_process(f"create_imported_{workflow_suffix}", [initial_data.dict()])
successfully_imported_data.append(getattr(initial_data, name_key))
typer.echo(
f"Successfully created {name_key}: {getattr(initial_data, name_key)}",
)
except ValidationError as e:
typer.echo(f"Validation error: {e}")
# Migrate new products from imported to "full" counterpart.
imported_products = get_subscriptions(
[imported_product_type], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in imported_products:
typer.echo(f"Importing {subscription_id}")
start_process(f"import_{workflow_suffix}", [subscription_id])
if successfully_imported_data:
typer.echo(f"Successfully created imported {name_key}s:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
typer.echo(f"Please validate no more imported {workflow_suffix} products exist anymore in the database.")
@app.command() @app.command()
def import_sites(filepath: str = common_filepath_option) -> None: def import_sites(filepath: str = common_filepath_option) -> None:
"""Import sites into GSO.""" """Import sites into GSO."""
# Use the import_data function to handle common import logic _generic_import_product(Path(filepath), ProductType.IMPORTED_SITE, "site", "site_name", SiteImportModel)
_generic_import_data(filepath, SiteImportModel, create_imported_site, "site_name")
site_ids = get_subscriptions(
[ProductType.IMPORTED_SITE], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in site_ids:
start_process("import_site", [subscription_id])
@app.command() @app.command()
def import_routers(filepath: str = common_filepath_option) -> None: def import_routers(filepath: str = common_filepath_option) -> None:
"""Import routers into GSO.""" """Import routers into GSO."""
# Use the import_data function to handle common import logic _generic_import_product(Path(filepath), ProductType.IMPORTED_ROUTER, "router", "hostname", RouterImportModel)
_generic_import_data(filepath, RouterImportModel, create_imported_router, "hostname")
router_ids = get_subscriptions(
[ProductType.IMPORTED_ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in router_ids:
start_process("import_router", [subscription_id])
@app.command() @app.command()
def import_super_pop_switches(filepath: str = common_filepath_option) -> None: def import_super_pop_switches(filepath: str = common_filepath_option) -> None:
"""Import Super PoP Switches into GSO.""" """Import Super PoP Switches into GSO."""
# Use the import_data function to handle common import logic _generic_import_product(
_generic_import_data(filepath, SuperPopSwitchImportModel, create_imported_super_pop_switch, "hostname") Path(filepath),
ProductType.IMPORTED_SUPER_POP_SWITCH,
super_pop_switch_ids = get_subscriptions( "super_pop_switch",
[ProductType.IMPORTED_SUPER_POP_SWITCH], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"] "hostname",
SuperPopSwitchImportModel,
) )
for subscription_id in super_pop_switch_ids:
start_process("import_super_pop_switch", [subscription_id])
@app.command() @app.command()
def import_office_routers(filepath: str = common_filepath_option) -> None: def import_office_routers(filepath: str = common_filepath_option) -> None:
"""Import office routers into GSO.""" """Import office routers into GSO."""
# Use the import_data function to handle common import logic _generic_import_product(
_generic_import_data(filepath, OfficeRouterImportModel, create_imported_office_router, "office_router_fqdn") Path(filepath),
ProductType.IMPORTED_OFFICE_ROUTER,
office_router_ids = get_subscriptions( "office_router",
[ProductType.IMPORTED_OFFICE_ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"] "office_router_fqdn",
OfficeRouterImportModel,
) )
for subscription_id in office_router_ids:
start_process("import_office_router", [subscription_id])
@app.command() @app.command()
def import_iptrunks(filepath: str = common_filepath_option) -> None: def import_iptrunks(filepath: str = common_filepath_option) -> None:
"""Import IP trunks into GSO.""" """Import IP trunks into GSO."""
successfully_imported_data = [] successfully_imported_data = []
data = _read_data(filepath) data = _read_data(Path(filepath))
for trunk in data: for trunk in data:
ipv4_network_a = ipaddress.IPv4Network(trunk["config"]["nodeA"]["ipv4_address"], strict=False) ipv4_network_a = ipaddress.IPv4Network(trunk["config"]["nodeA"]["ipv4_address"], strict=False)
ipv4_network_b = ipaddress.IPv4Network(trunk["config"]["nodeB"]["ipv4_address"], strict=False) ipv4_network_b = ipaddress.IPv4Network(trunk["config"]["nodeB"]["ipv4_address"], strict=False)
...@@ -201,7 +325,7 @@ def import_iptrunks(filepath: str = common_filepath_option) -> None: ...@@ -201,7 +325,7 @@ def import_iptrunks(filepath: str = common_filepath_option) -> None:
iptrunk_ipv4_network=iptrunk_ipv4_network, iptrunk_ipv4_network=iptrunk_ipv4_network,
iptrunk_ipv6_network=iptrunk_ipv6_network, iptrunk_ipv6_network=iptrunk_ipv6_network,
) )
create_imported_iptrunk(initial_data) start_process("create_imported_iptrunk", [initial_data.dict()])
successfully_imported_data.append(trunk["id"]) successfully_imported_data.append(trunk["id"])
typer.echo(f"Successfully imported IP Trunk: {trunk['id']}") typer.echo(f"Successfully imported IP Trunk: {trunk['id']}")
except ValidationError as e: except ValidationError as e:
...@@ -211,6 +335,7 @@ def import_iptrunks(filepath: str = common_filepath_option) -> None: ...@@ -211,6 +335,7 @@ def import_iptrunks(filepath: str = common_filepath_option) -> None:
[ProductType.IMPORTED_IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"] [ProductType.IMPORTED_IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
) )
for subscription_id in trunk_ids: for subscription_id in trunk_ids:
typer.echo(f"Migrating iptrunk {subscription_id}")
start_process("import_iptrunk", [subscription_id]) start_process("import_iptrunk", [subscription_id])
if successfully_imported_data: if successfully_imported_data:
......
...@@ -5,6 +5,7 @@ pycountry==22.3.5 ...@@ -5,6 +5,7 @@ pycountry==22.3.5
pynetbox==7.2.0 pynetbox==7.2.0
celery-redbeat==2.1.1 celery-redbeat==2.1.1
celery==5.3.4 celery==5.3.4
typer==0.7.0
# Test and linting dependencies # Test and linting dependencies
celery-stubs==0.1.3 celery-stubs==0.1.3
......
...@@ -16,6 +16,7 @@ setup( ...@@ -16,6 +16,7 @@ setup(
"pynetbox==7.2.0", "pynetbox==7.2.0",
"celery-redbeat==2.1.1", "celery-redbeat==2.1.1",
"celery==5.3.4", "celery==5.3.4",
"typer==0.7.0",
], ],
include_package_data=True, include_package_data=True,
) )
from unittest.mock import patch
from uuid import uuid4
import pytest
from orchestrator.db import SubscriptionTable
from orchestrator.services import subscriptions
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.utils.helpers import iso_from_ipv4
from gso.utils.shared_enums import Vendor
SITE_IMPORT_ENDPOINT = "/api/v1/imports/sites"
ROUTER_IMPORT_ENDPOINT = "/api/v1/imports/routers"
IPTRUNK_IMPORT_API_URL = "/api/v1/imports/iptrunks"
SUPER_POP_SWITCH_IMPORT_API_URL = "/api/v1/imports/super-pop-switches"
OFFICE_ROUTER_IMPORT_API_URL = "/api/v1/imports/office-routers"
@pytest.fixture()
def iptrunk_data(nokia_router_subscription_factory, faker):
router_side_a = nokia_router_subscription_factory()
router_side_b = nokia_router_subscription_factory()
return {
"partner": "GEANT",
"geant_s_sid": faker.geant_sid(),
"iptrunk_type": IptrunkType.DARK_FIBER,
"iptrunk_description": faker.sentence(),
"iptrunk_speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND,
"iptrunk_minimum_links": 5,
"iptrunk_isis_metric": 500,
"side_a_node_id": router_side_a,
"side_a_ae_iface": faker.network_interface(),
"side_a_ae_geant_a_sid": faker.geant_sid(),
"side_a_ae_members": [
{
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
}
for _ in range(5)
],
"side_b_node_id": router_side_b,
"side_b_ae_iface": faker.network_interface(),
"side_b_ae_geant_a_sid": faker.geant_sid(),
"side_b_ae_members": [
{
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
}
for _ in range(5)
],
"iptrunk_ipv4_network": str(faker.ipv4(network=True)),
"iptrunk_ipv6_network": str(faker.ipv6(network=True)),
}
@pytest.fixture()
def mock_routers(iptrunk_data):
with patch("gso.services.subscriptions.get_active_router_subscriptions") as mock_get_active_router_subscriptions:
def _active_router_subscriptions(*args, **kwargs):
if kwargs["includes"] == ["subscription_id", "description"]:
return [
{
"subscription_id": iptrunk_data["side_a_node_id"],
"description": "iptrunk_sideA_node_id description",
},
{
"subscription_id": iptrunk_data["side_b_node_id"],
"description": "iptrunk_sideB_node_id description",
},
{
"subscription_id": str(uuid4()),
"description": "random description",
},
]
return [
{"subscription_id": iptrunk_data["side_a_node_id"]},
{"subscription_id": iptrunk_data["side_b_node_id"]},
{"subscription_id": str(uuid4())},
]
mock_get_active_router_subscriptions.side_effect = _active_router_subscriptions
yield mock_get_active_router_subscriptions
@patch("gso.api.v1.imports.iptrunk._start_process")
def test_create_imported_iptrunk_successful_with_mocked_process(
mock_start_process, test_client, mock_routers, iptrunk_data
):
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
assert response.status_code == 201
assert response.json()["pid"] == "123e4567-e89b-12d3-a456-426655440000"
@pytest.fixture()
def site_data(faker):
return {
"site_name": faker.site_name(),
"site_city": faker.city(),
"site_country": faker.country(),
"site_country_code": faker.country_code(),
"site_latitude": float(faker.latitude()),
"site_longitude": float(faker.longitude()),
"site_bgp_community_id": faker.pyint(),
"site_internal_id": faker.pyint(),
"site_tier": SiteTier.TIER1,
"site_ts_address": faker.ipv4(),
"partner": "GEANT",
}
@pytest.fixture()
def router_data(faker, site_data):
mock_ipv4 = faker.ipv4()
return {
"hostname": "127.0.0.1",
"router_role": RouterRole.PE,
"router_vendor": Vendor.JUNIPER,
"router_site": site_data["site_name"],
"ts_port": 1234,
"partner": "GEANT",
"router_lo_ipv4_address": mock_ipv4,
"router_lo_ipv6_address": faker.ipv6(),
"router_lo_iso_address": iso_from_ipv4(mock_ipv4),
}
@pytest.fixture()
def super_pop_switch_data(faker, site_data):
mock_ipv4 = faker.ipv4()
return {
"hostname": "127.0.0.1",
"super_pop_switch_site": site_data["site_name"],
"super_pop_switch_ts_port": 1234,
"partner": "GEANT",
"super_pop_switch_mgmt_ipv4_address": mock_ipv4,
}
@pytest.fixture()
def office_router_data(faker, site_data):
return {
"office_router_fqdn": "127.0.0.1",
"office_router_site": site_data["site_name"],
"office_router_ts_port": 1234,
"partner": "GEANT",
"office_router_lo_ipv4_address": faker.ipv4(),
"office_router_lo_ipv6_address": faker.ipv6(),
}
def test_create_imported_site_endpoint(test_client, site_data):
assert SubscriptionTable.query.all() == []
# Post data to the endpoint
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 201
assert "detail" in response.json()
assert "pid" in response.json()
subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
resource_type="site_name",
value=site_data["site_name"],
)
assert subscription is not None
def test_create_imported_site_endpoint_with_existing_site(test_client, site_data):
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert SubscriptionTable.query.count() == 1
assert response.status_code == 201
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 422
assert SubscriptionTable.query.count() == 1
def test_create_imported_site_endpoint_with_invalid_data(test_client, site_data):
# invalid data, missing site_latitude and invalid site_longitude
site_data.pop("site_latitude")
site_data["site_longitude"] = "invalid"
assert SubscriptionTable.query.count() == 0
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 422
assert SubscriptionTable.query.count() == 0
response = response.json()
assert response["detail"][0]["loc"] == ["body", "site_latitude"]
assert response["detail"][0]["msg"] == "field required"
assert response["detail"][1]["loc"] == ["body", "site_longitude"]
assert response["detail"][1]["msg"] == "value is not a valid float"
def test_create_imported_router_endpoint(test_client, site_data, router_data):
# Create a site first
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 2
def test_create_imported_router_endpoint_with_invalid_data(test_client, site_data, router_data):
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
# invalid data, missing hostname and invalid router_lo_ipv6_address
router_data.pop("hostname")
router_data["router_lo_ipv6_address"] = "invalid"
response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data)
assert response.status_code == 422
assert SubscriptionTable.query.count() == 1
response = response.json()
assert response["detail"][0]["loc"] == ["body", "hostname"]
assert response["detail"][0]["msg"] == "field required"
assert response["detail"][1]["loc"] == ["body", "router_lo_ipv6_address"]
assert response["detail"][1]["msg"] == "value is not a valid IPv6 address"
def test_create_imported_iptrunk_successful_with_real_process(test_client, mock_routers, iptrunk_data):
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
assert response.status_code == 201
response = response.json()
assert "detail" in response
assert "pid" in response
subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
resource_type="geant_s_sid",
value=iptrunk_data["geant_s_sid"],
)
assert subscription is not None
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_invalid_partner(mock_start_process, test_client, mock_routers, iptrunk_data):
iptrunk_data["partner"] = "not_existing_partner"
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
assert response.status_code == 422
assert response.json() == {
"detail": [
{
"loc": ["body", "partner"],
"msg": "partner not_existing_partner not found",
"type": "value_error",
},
],
}
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data):
iptrunk_data["side_a_node_id"] = "NOT FOUND"
iptrunk_data["side_b_node_id"] = "NOT FOUND"
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
assert response.status_code == 422
assert response.json() == {
"detail": [
{
"loc": ["body", "side_a_node_id"],
"msg": f"Router {iptrunk_data['side_a_node_id']} not found",
"type": "value_error",
},
{
"loc": ["body", "side_b_node_id"],
"msg": f"Router {iptrunk_data['side_b_node_id']} not found",
"type": "value_error",
},
],
}
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_non_unique_members_side_a(
mock_start_process, test_client, mock_routers, iptrunk_data, faker
):
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
repeat_interface_a = {
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
}
repeat_interface_b = {
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
}
iptrunk_data["side_a_ae_members"] = [repeat_interface_a for _ in range(5)]
iptrunk_data["side_b_ae_members"] = [repeat_interface_b for _ in range(5)]
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
assert response.status_code == 422
assert response.json() == {
"detail": [
{
"loc": ["body", "side_a_ae_members"],
"msg": "Items must be unique",
"type": "value_error",
},
{
"loc": ["body", "side_b_ae_members"],
"msg": "Items must be unique",
"type": "value_error",
},
{
"loc": ["body", "__root__"],
"msg": "Side A members should be at least 5 (iptrunk_minimum_links)",
"type": "value_error",
},
],
}
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_fails_on_side_a_member_count_mismatch(
mock_start_process,
test_client,
mock_routers,
iptrunk_data,
):
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
iptrunk_data["side_a_ae_members"].remove(iptrunk_data["side_a_ae_members"][0])
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
assert response.status_code == 422
assert response.json() == {
"detail": [
{
"loc": ["body", "__root__"],
"msg": "Side A members should be at least 5 (iptrunk_minimum_links)",
"type": "value_error",
},
],
}
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_fails_on_side_a_and_b_members_mismatch(
mock_start_process,
test_client,
iptrunk_data,
mock_routers,
):
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
iptrunk_data["side_b_ae_members"].remove(iptrunk_data["side_b_ae_members"][0])
response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data)
assert response.status_code == 422
assert response.json() == {
"detail": [
{
"loc": ["body", "__root__"],
"msg": "Mismatch between Side A and B members",
"type": "value_error",
},
],
}
def test_create_imported_super_pop_switch_endpoint(test_client, site_data, super_pop_switch_data):
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 2
def test_create_imported_super_pop_switch_endpoint_with_invalid_data(test_client, site_data, super_pop_switch_data):
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
# invalid data, missing hostname and invalid mgmt_ipv4_address
super_pop_switch_data.pop("hostname")
super_pop_switch_data["super_pop_switch_mgmt_ipv4_address"] = "invalid"
response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data)
assert response.status_code == 422
assert SubscriptionTable.query.count() == 1
response = response.json()
assert response["detail"][0]["loc"] == ["body", "hostname"]
assert response["detail"][0]["msg"] == "field required"
assert response["detail"][1]["loc"] == ["body", "super_pop_switch_mgmt_ipv4_address"]
assert response["detail"][1]["msg"] == "value is not a valid IPv4 address"
def test_create_imported_office_router_endpoint(test_client, site_data, office_router_data):
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 2
def test_create_imported_office_router_endpoint_with_invalid_data(test_client, site_data, office_router_data):
response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
# invalid data, missing FQDN and invalid lo_ipv6_address
office_router_data.pop("office_router_fqdn")
office_router_data["office_router_lo_ipv6_address"] = "invalid"
response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data)
assert response.status_code == 422
assert SubscriptionTable.query.count() == 1
response = response.json()
assert response["detail"][0]["loc"] == ["body", "office_router_fqdn"]
assert response["detail"][0]["msg"] == "field required"
assert response["detail"][1]["loc"] == ["body", "office_router_lo_ipv6_address"]
assert response["detail"][1]["msg"] == "value is not a valid IPv6 address"
from test.fixtures import ( # noqa: F401
iptrunk_side_subscription_factory,
iptrunk_subscription_factory,
nokia_router_subscription_factory,
office_router_subscription_factory,
site_subscription_factory,
super_pop_switch_subscription_factory,
)
import json
from pathlib import Path
import pytest
from orchestrator.db import SubscriptionTable
from orchestrator.services.subscriptions import retrieve_subscription_by_subscription_instance_value
from gso.cli.imports import (
import_iptrunks,
import_office_routers,
import_routers,
import_sites,
import_super_pop_switches,
)
from gso.products import ProductType, Router, Site
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.services.subscriptions import (
get_active_iptrunk_subscriptions,
get_active_router_subscriptions,
get_subscriptions,
)
from gso.utils.helpers import iso_from_ipv4
from gso.utils.shared_enums import Vendor
##############
# FIXTURES #
##############
@pytest.fixture()
def temp_file(tmp_path) -> Path:
return tmp_path / "data.json"
@pytest.fixture()
def iptrunk_data(temp_file, nokia_router_subscription_factory, faker) -> (Path, dict):
def _iptrunk_data(
*,
ipv4_network=None,
ipv6_network=None,
min_links=None,
isis_metric=None,
side_a_node=None,
side_b_node=None,
side_a_members=None,
side_b_members=None,
side_a_ae_name=None,
side_b_ae_name=None,
):
router_side_a = nokia_router_subscription_factory()
router_side_b = nokia_router_subscription_factory()
ipv4_network = ipv4_network or str(faker.ipv4_network(max_subnet=31))
ipv6_network = ipv6_network or str(faker.ipv6_network(max_subnet=126))
iptrunk_data = {
"id": faker.geant_sid(),
"config": {
"common": {
"link_speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND,
"minimum_links": min_links or 3,
"isis_metric": isis_metric or 500,
"type": IptrunkType.DARK_FIBER,
},
"nodeA": {
"name": side_a_node or Router.from_subscription(router_side_a).router.router_fqdn,
"ae_name": side_a_ae_name or faker.network_interface(),
"port_sid": faker.geant_sid(),
"members": side_a_members
or [
{
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
}
for _ in range(5)
],
"ipv4_address": ipv4_network,
"ipv6_address": ipv6_network,
},
"nodeB": {
"name": side_b_node or Router.from_subscription(router_side_b).router.router_fqdn,
"ae_name": side_b_ae_name or faker.network_interface(),
"port_sid": faker.geant_sid(),
"members": side_b_members
or [
{
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
}
for _ in range(5)
],
"ipv4_address": ipv4_network,
"ipv6_address": ipv6_network,
},
},
}
temp_file.write_text(json.dumps([iptrunk_data]))
return {"path": str(temp_file), "data": iptrunk_data}
return _iptrunk_data
@pytest.fixture()
def site_data(faker, temp_file):
def _site_data(**kwargs):
site_data = {
"site_name": faker.site_name(),
"site_city": faker.city(),
"site_country": faker.country(),
"site_country_code": faker.country_code(),
"site_latitude": float(faker.latitude()),
"site_longitude": float(faker.longitude()),
"site_bgp_community_id": faker.pyint(),
"site_internal_id": faker.pyint(),
"site_tier": SiteTier.TIER1,
"site_ts_address": faker.ipv4(),
}
site_data.update(**kwargs)
temp_file.write_text(json.dumps([site_data]))
return {"path": str(temp_file), "data": site_data}
return _site_data
@pytest.fixture()
def router_data(temp_file, faker, site_subscription_factory):
def _router_data(**kwargs):
mock_ipv4 = faker.ipv4()
router_data = {
"router_site": Site.from_subscription(site_subscription_factory()).site.site_name,
"hostname": str(faker.ipv4()),
"ts_port": faker.port_number(),
"router_role": RouterRole.PE,
"router_vendor": Vendor.JUNIPER,
"router_lo_ipv4_address": mock_ipv4,
"router_lo_ipv6_address": str(faker.ipv6()),
"router_lo_iso_address": iso_from_ipv4(mock_ipv4),
}
router_data.update(**kwargs)
temp_file.write_text(json.dumps([router_data]))
return {"path": str(temp_file), "data": router_data}
return _router_data
@pytest.fixture()
def super_pop_switch_data(temp_file, faker, site_subscription_factory):
def _super_pop_switch_data(**kwargs):
super_pop_switch_data = {
"hostname": str(faker.ipv4()),
"super_pop_switch_site": Site.from_subscription(site_subscription_factory()).site.site_name,
"super_pop_switch_ts_port": faker.port_number(),
"super_pop_switch_mgmt_ipv4_address": str(faker.ipv4()),
}
super_pop_switch_data.update(**kwargs)
temp_file.write_text(json.dumps([super_pop_switch_data]))
return {"path": str(temp_file), "data": super_pop_switch_data}
return _super_pop_switch_data
@pytest.fixture()
def office_router_data(temp_file, faker, site_subscription_factory):
def _office_router_data(**kwargs):
office_router_data = {
"office_router_fqdn": faker.domain_name(levels=4),
"office_router_site": Site.from_subscription(site_subscription_factory()).site.site_name,
"office_router_ts_port": faker.port_number(),
"office_router_lo_ipv4_address": str(faker.ipv4()),
"office_router_lo_ipv6_address": str(faker.ipv6()),
}
office_router_data.update(**kwargs)
temp_file.write_text(json.dumps([office_router_data]))
return {"path": str(temp_file), "data": office_router_data}
return _office_router_data
###########
# TESTS #
###########
def test_import_iptrunk_success(iptrunk_data):
assert len(get_active_iptrunk_subscriptions()) == 0
import_iptrunks(iptrunk_data()["path"])
assert len(get_subscriptions([ProductType.IMPORTED_IP_TRUNK])) == 0
assert len(get_active_iptrunk_subscriptions()) == 1
def test_import_site_success(site_data):
assert SubscriptionTable.query.all() == []
mock_site_data = site_data()
import_sites(mock_site_data["path"])
assert len(get_subscriptions([ProductType.IMPORTED_SITE])) == 0
subscription = retrieve_subscription_by_subscription_instance_value(
resource_type="site_name", value=mock_site_data["data"]["site_name"]
)
assert subscription is not None
def test_import_site_twice(site_data, capfd):
path_location = site_data()["path"]
import_sites(path_location)
# Second identical import should print ValidationError to stdout
import_sites(path_location)
out, _ = capfd.readouterr()
assert (
"""Validation error: 4 validation errors for SiteImportModel
site_bgp_community_id
site_bgp_community_id must be unique (type=value_error)
site_internal_id
site_internal_id must be unique (type=value_error)
site_ts_address
site_ts_address must be unique (type=value_error)
site_name
site_name must be unique (type=value_error)"""
in out
)
assert SubscriptionTable.query.count() == 1
def test_import_site_with_invalid_data(site_data, capfd):
# invalid data, missing site_latitude and invalid site_longitude
incorrect_site_data = site_data(site_latitude=None, site_longitude="broken")
assert SubscriptionTable.query.count() == 0
import_sites(incorrect_site_data["path"])
out, _ = capfd.readouterr()
assert (
"""Validation error: 2 validation errors for SiteImportModel
site_latitude
none is not an allowed value (type=type_error.none.not_allowed)
site_longitude
value is not a valid float (type=type_error.float)"""
in out
)
assert SubscriptionTable.query.count() == 0
def test_import_router_success(site_subscription_factory, router_data):
assert SubscriptionTable.query.count() == 0
import_routers(router_data()["path"])
assert len(get_active_router_subscriptions()) == 1
def test_import_router_with_invalid_data(router_data, capfd):
broken_data = router_data(hostname="", router_lo_ipv6_address="Not an IP address")
import_routers(broken_data["path"])
# Only a Site has been added, no Router
assert SubscriptionTable.query.count() == 1
out, _ = capfd.readouterr()
# The extra space at the end of the next line is required, and not dangling by accident.
assert "Validation error: 1 validation error for RouterImportModel" in out
assert (
"""router_lo_ipv6_address
value is not a valid IPv6 address (type=value_error.ipv6address)"""
in out
)
def test_import_iptrunk_successful(iptrunk_data):
assert SubscriptionTable.query.count() == 0
import_iptrunks(iptrunk_data()["path"])
assert SubscriptionTable.query.count() == 5
assert len(get_active_iptrunk_subscriptions()) == 1
def test_import_iptrunk_invalid_router_id_side_a_and_b(iptrunk_data, capfd):
broken_data = iptrunk_data(side_a_node="Doesn't exist", side_b_node="Also doesn't exist")
import_iptrunks(broken_data["path"])
out, _ = capfd.readouterr()
assert SubscriptionTable.query.count() == 4
assert len(get_active_iptrunk_subscriptions()) == 0
assert (
"""Validation error: 2 validation errors for IptrunkImportModel
side_a_node_id
Router not found (type=value_error)
side_b_node_id
Router not found (type=value_error)"""
in out
)
def test_import_iptrunk_non_unique_members_side_a_and_b(iptrunk_data, faker, capfd):
duplicate_interface = {"interface_name": faker.network_interface(), "interface_description": faker.sentence()}
side_a_members = [duplicate_interface for _ in range(5)]
side_b_members = [duplicate_interface for _ in range(5)]
broken_data = iptrunk_data(side_a_members=side_a_members, side_b_members=side_b_members)
import_iptrunks(broken_data["path"])
out, _ = capfd.readouterr()
assert SubscriptionTable.query.count() == 4
assert len(get_active_iptrunk_subscriptions()) == 0
assert (
"""Validation error: 3 validation errors for IptrunkImportModel
side_a_ae_members
Items must be unique (type=value_error)
side_b_ae_members
Items must be unique (type=value_error)"""
in out
)
def test_import_iptrunk_side_a_member_count_mismatch(iptrunk_data, faker, capfd):
side_a_members = [
{"interface_name": faker.network_interface(), "interface_description": faker.sentence()} for _ in range(5)
]
side_b_members = [
{"interface_name": faker.network_interface(), "interface_description": faker.sentence()} for _ in range(6)
]
broken_data = iptrunk_data(side_a_members=side_a_members, side_b_members=side_b_members)
import_iptrunks(broken_data["path"])
out, _ = capfd.readouterr()
assert SubscriptionTable.query.count() == 4
assert len(get_active_iptrunk_subscriptions()) == 0
assert (
"""Validation error: 1 validation error for IptrunkImportModel
__root__
Mismatch between Side A and B members (type=value_error)"""
in out
)
def test_import_office_router_success(office_router_data):
assert SubscriptionTable.query.count() == 0
import_office_routers(office_router_data()["path"])
assert SubscriptionTable.query.count() == 2
assert len(get_subscriptions([ProductType.IMPORTED_OFFICE_ROUTER])) == 0
assert len(get_subscriptions([ProductType.OFFICE_ROUTER])) == 1
def test_import_super_pop_switch_success(super_pop_switch_data):
assert SubscriptionTable.query.count() == 0
import_super_pop_switches(super_pop_switch_data()["path"])
assert SubscriptionTable.query.count() == 2
assert len(get_subscriptions([ProductType.IMPORTED_SUPER_POP_SWITCH])) == 0
assert len(get_subscriptions([ProductType.SUPER_POP_SWITCH])) == 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment