Skip to content
Snippets Groups Projects
Verified Commit 42f095de authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Make import CLI command also migrate imported subscriptions to real ones

parent dd0e3d38
No related branches found
No related tags found
1 merge request!201Add imported products
......@@ -10,23 +10,19 @@ from typing import TypeVar
import typer
import yaml
from orchestrator.db import db
from orchestrator.services.processes import start_process
from orchestrator.types import SubscriptionLifecycle
from pydantic import ValidationError
from sqlalchemy.exc import SQLAlchemyError
from gso.api.v1.imports import (
IptrunkImportModel,
OfficeRouterImportModel,
RouterImportModel,
SiteImportModel,
SuperPopSwitchImportModel,
create_imported_iptrunk,
create_imported_office_router,
create_imported_router,
create_imported_site,
create_imported_super_pop_switch,
)
from gso.api.v1.imports.iptrunk import IptrunkImportModel, create_imported_iptrunk
from gso.api.v1.imports.office_router import OfficeRouterImportModel, create_imported_office_router
from gso.api.v1.imports.router import RouterImportModel, create_imported_router
from gso.api.v1.imports.site import SiteImportModel, create_imported_site
from gso.api.v1.imports.super_pop_switch import SuperPopSwitchImportModel, create_imported_super_pop_switch
from gso.db.models import PartnerTable
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value
from gso.products import ProductType
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value, get_subscriptions
app: typer.Typer = typer.Typer()
......@@ -40,7 +36,7 @@ common_filepath_option = typer.Option(
)
def read_data(filepath: str) -> dict:
def _read_data(filepath: str) -> dict:
"""Read data from a JSON or YAML file."""
typer.echo(f"Starting import from {filepath}")
file_path = Path(filepath)
......@@ -58,7 +54,7 @@ def read_data(filepath: str) -> dict:
raise typer.Exit(code=1)
def generic_import_data(
def _generic_import_data(
filepath: str,
import_model: type[T],
import_function: callable, # type: ignore[valid-type]
......@@ -66,7 +62,7 @@ def generic_import_data(
) -> None:
"""Import data from a JSON or YAML file."""
successfully_imported_data = []
data = read_data(filepath)
data = _read_data(filepath)
for details in data:
details["partner"] = "GEANT"
typer.echo(f"Importing {name_key}: {details[name_key]}")
......@@ -86,67 +82,86 @@ def generic_import_data(
typer.echo(f"- {item}")
def _get_router_subscription_id(node_name: str) -> str | None:
"""Get the subscription id for a router by its node name."""
subscriptions = get_active_subscriptions_by_field_and_value(
"router_fqdn",
node_name,
)
if subscriptions:
return str(subscriptions[0].subscription_id)
return None
def _import_partners_from_csv(file_path: Path) -> list[dict]:
"""Read partners from a CSV file."""
with Path.open(file_path, encoding="utf-8") as csv_file:
csv_reader = csv.DictReader(csv_file)
return list(csv_reader)
@app.command()
def create_imported_sites(filepath: str = common_filepath_option) -> None:
def import_sites(filepath: str = common_filepath_option) -> None:
"""Import sites into GSO."""
# Use the import_data function to handle common import logic
generic_import_data(filepath, SiteImportModel, create_imported_site, "site_name")
_generic_import_data(filepath, SiteImportModel, create_imported_site, "site_name")
site_ids = get_subscriptions(
[ProductType.IMPORTED_SITE], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in site_ids:
start_process("import_site", [subscription_id])
@app.command()
def create_imported_routers(filepath: str = common_filepath_option) -> None:
def import_routers(filepath: str = common_filepath_option) -> None:
"""Import routers into GSO."""
# Use the import_data function to handle common import logic
generic_import_data(filepath, RouterImportModel, create_imported_router, "hostname")
_generic_import_data(filepath, RouterImportModel, create_imported_router, "hostname")
router_ids = get_subscriptions(
[ProductType.IMPORTED_ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in router_ids:
start_process("import_router", [subscription_id])
@app.command()
def create_imported_super_pop_switches(filepath: str = common_filepath_option) -> None:
def import_super_pop_switches(filepath: str = common_filepath_option) -> None:
"""Import Super PoP Switches into GSO."""
# Use the import_data function to handle common import logic
generic_import_data(filepath, SuperPopSwitchImportModel, create_imported_super_pop_switch, "hostname")
_generic_import_data(filepath, SuperPopSwitchImportModel, create_imported_super_pop_switch, "hostname")
super_pop_switch_ids = get_subscriptions(
[ProductType.IMPORTED_SUPER_POP_SWITCH], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in super_pop_switch_ids:
start_process("import_super_pop_switch", [subscription_id])
@app.command()
def create_imported_office_routers(filepath: str = common_filepath_option) -> None:
def import_office_routers(filepath: str = common_filepath_option) -> None:
"""Import office routers into GSO."""
# Use the import_data function to handle common import logic
generic_import_data(filepath, OfficeRouterImportModel, create_imported_office_router, "office_router_fqdn")
_generic_import_data(filepath, OfficeRouterImportModel, create_imported_office_router, "office_router_fqdn")
def get_router_subscription_id(node_name: str) -> str | None:
"""Get the subscription id for a router by its node name."""
subscriptions = get_active_subscriptions_by_field_and_value(
"router_fqdn",
node_name,
office_router_ids = get_subscriptions(
[ProductType.IMPORTED_OFFICE_ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
if subscriptions:
return str(subscriptions[0].subscription_id)
return None
for subscription_id in office_router_ids:
start_process("import_office_router", [subscription_id])
@app.command()
def create_imported_iptrunks(filepath: str = common_filepath_option) -> None:
def import_iptrunks(filepath: str = common_filepath_option) -> None:
"""Import IP trunks into GSO."""
successfully_imported_data = []
data = read_data(filepath)
data = _read_data(filepath)
for trunk in data:
ipv4_network_a = ipaddress.ip_network(
trunk["config"]["nodeA"]["ipv4_address"],
strict=False,
)
ipv4_network_b = ipaddress.ip_network(
trunk["config"]["nodeB"]["ipv4_address"],
strict=False,
)
ipv6_network_a = ipaddress.ip_network(
trunk["config"]["nodeA"]["ipv6_address"],
strict=False,
)
ipv6_network_b = ipaddress.ip_network(
trunk["config"]["nodeB"]["ipv6_address"],
strict=False,
)
ipv4_network_a = ipaddress.IPv4Network(trunk["config"]["nodeA"]["ipv4_address"], strict=False)
ipv4_network_b = ipaddress.IPv4Network(trunk["config"]["nodeB"]["ipv4_address"], strict=False)
ipv6_network_a = ipaddress.IPv6Network(trunk["config"]["nodeA"]["ipv6_address"], strict=False)
ipv6_network_b = ipaddress.IPv6Network(trunk["config"]["nodeB"]["ipv6_address"], strict=False)
# Check if IPv4 networks are equal
if ipv4_network_a == ipv4_network_b:
iptrunk_ipv4_network = ipv4_network_a
......@@ -175,22 +190,16 @@ def create_imported_iptrunks(filepath: str = common_filepath_option) -> None:
iptrunk_speed=trunk["config"]["common"]["link_speed"],
iptrunk_minimum_links=trunk["config"]["common"]["minimum_links"],
iptrunk_isis_metric=trunk["config"]["common"]["isis_metric"],
side_a_node_id=get_router_subscription_id(
trunk["config"]["nodeA"]["name"],
)
or "",
side_a_node_id=_get_router_subscription_id(trunk["config"]["nodeA"]["name"]) or "",
side_a_ae_iface=trunk["config"]["nodeA"]["ae_name"],
side_a_ae_geant_a_sid=trunk["config"]["nodeA"]["port_sid"],
side_a_ae_members=trunk["config"]["nodeA"]["members"],
side_b_node_id=get_router_subscription_id(
trunk["config"]["nodeB"]["name"],
)
or "",
side_b_node_id=_get_router_subscription_id(trunk["config"]["nodeB"]["name"]) or "",
side_b_ae_iface=trunk["config"]["nodeB"]["ae_name"],
side_b_ae_geant_a_sid=trunk["config"]["nodeB"]["port_sid"],
side_b_ae_members=trunk["config"]["nodeB"]["members"],
iptrunk_ipv4_network=iptrunk_ipv4_network, # type:ignore[arg-type]
iptrunk_ipv6_network=iptrunk_ipv6_network, # type:ignore[arg-type]
iptrunk_ipv4_network=iptrunk_ipv4_network,
iptrunk_ipv6_network=iptrunk_ipv6_network,
)
create_imported_iptrunk(initial_data)
successfully_imported_data.append(trunk["id"])
......@@ -198,25 +207,24 @@ def create_imported_iptrunks(filepath: str = common_filepath_option) -> None:
except ValidationError as e:
typer.echo(f"Validation error: {e}")
trunk_ids = get_subscriptions(
[ProductType.IMPORTED_IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in trunk_ids:
start_process("import_iptrunk", [subscription_id])
if successfully_imported_data:
typer.echo("Successfully imported IP Trunks:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
def import_partners_from_csv(file_path: Path) -> list[dict]:
"""Read partners from a CSV file."""
with Path.open(file_path, encoding="utf-8") as csv_file:
csv_reader = csv.DictReader(csv_file)
return list(csv_reader)
@app.command()
def import_partners(file_path: str = typer.Argument(..., help="Path to the CSV file containing partners")) -> None:
"""Import partners from a CSV file into the database."""
typer.echo(f"Importing partners from {file_path} ...")
partners = import_partners_from_csv(Path(file_path))
partners = _import_partners_from_csv(Path(file_path))
try:
for partner in partners:
......
......@@ -15,7 +15,7 @@ from gso.services.subscriptions import get_product_id_by_name
def import_iptrunk_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedIptrunk subscription, and turn it into an Iptrunk subscription."""
old_iptrunk = ImportedIptrunk.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription_id = get_product_id_by_name(ProductName.IP_TRUNK)
new_subscription = Iptrunk.from_other_product(old_iptrunk, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -15,7 +15,7 @@ from gso.services.subscriptions import get_product_id_by_name
def import_office_router_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedOfficeRouter subscription, and turn it into an OfficeRouter subscription."""
old_office_router = ImportedOfficeRouter.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription_id = get_product_id_by_name(ProductName.OFFICE_ROUTER)
new_subscription = OfficeRouter.from_other_product(old_office_router, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -15,7 +15,7 @@ from gso.services.subscriptions import get_product_id_by_name
def import_router_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedRouter subscription, and turn it into a Router subscription."""
old_router = ImportedRouter.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription_id = get_product_id_by_name(ProductName.ROUTER)
new_subscription = Router.from_other_product(old_router, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -15,7 +15,7 @@ from gso.services.subscriptions import get_product_id_by_name
def import_super_pop_switch_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedSuperPopSwitch subscription, and turn it into a SuperPopSwitch subscription."""
old_super_pop_switch = ImportedSuperPopSwitch.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription_id = get_product_id_by_name(ProductName.SUPER_POP_SWITCH)
new_subscription = SuperPopSwitch.from_other_product(old_super_pop_switch, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment