Newer
Older
""":term:`CLI` command for importing data to coreDB."""
import ipaddress
import json
from pathlib import Path
from typing import TypeVar
from pydantic import ValidationError
from gso.api.v1.imports import (
IptrunkImportModel,
)
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value
app: typer.Typer = typer.Typer()
T = TypeVar("T", SiteImportModel, RouterImportModel, IptrunkImportModel)
common_filepath_option = typer.Option(
default="data.json",
help="Path to the file",
)
def read_data(filepath: str) -> dict:
"""Read data from a JSON or YAML file."""
typer.echo(f"Starting import from {filepath}")
file_path = Path(filepath)
file_extension = file_path.suffix.lower()
with file_path.open("r") as f:
supported_extensions = {".json", ".yaml", ".yml"}
if file_extension == ".json":
typer.echo(f"Unsupported file format: {file_extension}")
raise typer.Exit(code=1)
def generic_import_data(
filepath: str,
import_model: type[T],
import_function: callable, # type: ignore[valid-type]
name_key: str,
) -> None:
"""Import data from a JSON or YAML file."""
successfully_imported_data = []
data = read_data(filepath)
for details in data:
details["customer"] = "GÉANT"
typer.echo(f"Importing {name_key}: {details[name_key]}")
try:
initial_data = import_model(**details)
import_function(initial_data) # type: ignore[misc]
successfully_imported_data.append(getattr(initial_data, name_key))
typer.echo(
f"Successfully imported {name_key}: {getattr(initial_data, name_key)}",
)
except ValidationError as e:
typer.echo(f"Validation error: {e}")
if successfully_imported_data:
typer.echo(f"Successfully imported {name_key}s:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
@app.command()
def import_sites(filepath: str = common_filepath_option) -> None:
# Use the import_data function to handle common import logic
generic_import_data(filepath, SiteImportModel, import_site, "site_name")
@app.command()
def import_routers(filepath: str = common_filepath_option) -> None:
# Use the import_data function to handle common import logic
generic_import_data(filepath, RouterImportModel, import_router, "hostname")
def get_router_subscription_id(node_name: str) -> str | None:
"""Get the subscription id for a router by its node name."""
subscriptions = get_active_subscriptions_by_field_and_value(
"router_fqdn",
node_name,
)
if subscriptions:
return str(subscriptions[0].subscription_id)
def import_iptrunks(filepath: str = common_filepath_option) -> None:
successfully_imported_data = []
data = read_data(filepath)
for trunk in data:
ipv4_network_a = ipaddress.ip_network(
trunk["config"]["nodeA"]["ipv4_address"],
strict=False,
)
ipv4_network_b = ipaddress.ip_network(
trunk["config"]["nodeB"]["ipv4_address"],
strict=False,
)
ipv6_network_a = ipaddress.ip_network(
trunk["config"]["nodeA"]["ipv6_address"],
strict=False,
)
ipv6_network_b = ipaddress.ip_network(
trunk["config"]["nodeB"]["ipv6_address"],
strict=False,
)
# Check if IPv4 networks are equal
if ipv4_network_a == ipv4_network_b:
iptrunk_ipv4_network = ipv4_network_a
else:
# Handle the case where IPv4 networks are different
typer.echo(f"Error: IPv4 networks are different for trunk {trunk['id']}.")
continue
# Check if IPv6 networks are the same
if ipv6_network_a == ipv6_network_b:
iptrunk_ipv6_network = ipv6_network_a
else:
# Handle the case where IPv6 networks are different
typer.echo(f"Error: IPv6 networks are different for trunk {trunk['id']}.")
continue
typer.echo(
f"Importing IP Trunk: "
f'{get_active_subscriptions_by_field_and_value("router_fqdn", trunk["config"]["nodeA"]["name"])}',
)
try:
initial_data = IptrunkImportModel(
customer="GÉANT",
geant_s_sid=trunk["id"],
iptrunk_type=trunk["config"]["common"]["type"],
iptrunk_description=trunk["config"]["common"].get("description", ""),
iptrunk_speed=trunk["config"]["common"]["link_speed"],
iptrunk_minimum_links=trunk["config"]["common"]["minimum_links"],
iptrunk_isis_metric=trunk["config"]["common"]["isis_metric"],
side_a_node_id=get_router_subscription_id(
trunk["config"]["nodeA"]["name"],
)
or "",
side_a_ae_iface=trunk["config"]["nodeA"]["ae_name"],
side_a_ae_geant_a_sid=trunk["config"]["nodeA"]["port_sid"],
side_a_ae_members=trunk["config"]["nodeA"]["members"],
side_b_node_id=get_router_subscription_id(
trunk["config"]["nodeB"]["name"],
)
or "",
side_b_ae_iface=trunk["config"]["nodeB"]["ae_name"],
side_b_ae_geant_a_sid=trunk["config"]["nodeB"]["port_sid"],
side_b_ae_members=trunk["config"]["nodeB"]["members"],
iptrunk_ipv4_network=iptrunk_ipv4_network, # type:ignore[arg-type]
iptrunk_ipv6_network=iptrunk_ipv6_network, # type:ignore[arg-type]
)
import_iptrunk(initial_data)
successfully_imported_data.append(trunk["id"])
typer.echo(f"Successfully imported IP Trunk: {trunk['id']}")
except ValidationError as e:
typer.echo(f"Validation error: {e}")

Neda Moeini
committed
if successfully_imported_data:
typer.echo("Successfully imported IP Trunks:")
for item in successfully_imported_data:
typer.echo(f"- {item}")