Skip to content
Snippets Groups Projects
Commit 5474620f authored by Neda Moeini's avatar Neda Moeini
Browse files

Added Router import endpoints.

parent c0cd6c85
No related branches found
No related tags found
1 merge request!60Feature/nat 217 import sites
Pipeline #83851 failed
......@@ -5,9 +5,9 @@ from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default
from gso.api.api_v1.endpoints import import_site
from gso.api.api_v1.endpoints import imports
api_router = APIRouter()
api_router.include_router(
import_site.router, prefix="/import/site", dependencies=[Depends(opa_security_default)]
imports.router, prefix="/imports", dependencies=[Depends(opa_security_default)]
)
\ No newline at end of file
from typing import Dict, Any
import ipaddress
from typing import Dict, Any, Optional
from uuid import UUID
from fastapi import HTTPException, status
from fastapi.routing import APIRouter
from orchestrator.services import processes, subscriptions
from pydantic import BaseModel
from sqlalchemy.exc import MultipleResultsFound
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier
router = APIRouter()
def start_process(process_name: str, data: dict) -> UUID:
"""Utility function to start a process and handle common exceptions."""
pid = processes.start_process(process_name, [data])
if pid is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")
process = processes._get_process(pid) # pylint: disable=protected-access
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
class SiteImport(BaseModel):
site_name: str
site_city: str
......@@ -23,28 +44,62 @@ class SiteImport(BaseModel):
site_ts_address: str
@router.post("/", status_code=status.HTTP_201_CREATED, tags=["Import"])
@router.post("/sites", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_site(site: SiteImport) -> Dict[str, Any]:
"""
Import site by running the import_site workflow.
response:
- pid: The process id of the started process.
Args:
- site: A SiteImport object containing site details.
Returns:
- A dictionary containing the detail message and the process id.
Raises:
HTTPException: If the site already exists or if there's an error in the process.
- HTTPException: If there's an error in the process or if the site already exists.
"""
subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
resource_type="site_name", value=site.site_name, sub_status=("provisioning", "active"))
if subscription:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Site already exists.")
try:
subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
resource_type="site_name", value=site.site_name, sub_status=("provisioning", "active")
)
if subscription:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Site already exists.")
except MultipleResultsFound:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.")
pid = processes.start_process("import_site", [site.dict()])
if pid is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")
pid = start_process("import_site", site.dict())
return {"detail": "Site import process started.", "pid": pid}
process = processes._get_process(pid) # pylint: disable=protected-access
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error.")
return {"pid": str(pid)}
class RouterImportModel(BaseModel):
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: Optional[bool] = None
router_access_via_ts: Optional[bool] = None
router_lo_ipv4_address: Optional[ipaddress.IPv4Address] = None
router_lo_ipv6_address: Optional[ipaddress.IPv6Address] = None
router_lo_iso_address: Optional[str] = None
router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None
@router.post("/routers", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_router(router_data: RouterImportModel):
"""
Import router by running the import_router workflow.
Args:
- router_data: A RouterImportModel object containing router details.
Returns:
- A dictionary containing the detail message and the process id.
Raises:
- HTTPException: If there's an error in the process.
"""
pid = start_process("import_router", router_data.dict())
return {"detail": f"Router added successfully", "pid": pid}
from typing import Any, Dict, Generator
import requests
import typer
from pydantic import ValidationError
from gso.api.api_v1.endpoints.import_site import SiteImport, import_site
from gso.products.product_blocks.site import SiteTier
app: typer.Typer = typer.Typer()
def get_site_details() -> Generator[Dict[str, Any], None, None]:
site_list_url = "https://prod-inventory-provider01.geant.org/neteng/pops"
site_details_url_template = "https://prod-inventory-provider01.geant.org/neteng/pop/{site}"
site_list_response = requests.get(site_list_url)
site_list = site_list_response.json()
for site in site_list:
site_details_url = site_details_url_template.format(site=site)
site_details_response = requests.get(site_details_url)
yield site_details_response.json()
@app.command()
def import_from_inventory_provider():
"""
Import sites into GSO using Inventory Provider API.
"""
for site_details in get_site_details():
try:
mapped_site_details = {
"site_name": site_details["name"],
"site_city": site_details["city"],
"site_country": site_details["country"],
"site_latitude": site_details["latitude"],
"site_longitude": site_details["longitude"],
"site_internal_id": 0,
"site_country_code": "NL",
"site_bgp_community_id": 0,
"site_tier": SiteTier.tier1
}
initial_data = SiteImport(**mapped_site_details)
import_site(initial_data)
typer.echo(f"Successfully imported site: {initial_data.site_name}")
except ValidationError as e:
typer.echo(f"Validation error: {e}")
def import_sites():
"""Import sites from a source."""
# TODO: Implement this CLI command to import sites from a source.
typer.echo("Importing sites...")
from typing import Optional
class CustomerNotFoundError(Exception):
"""Exception raised when a customer is not found."""
pass
def all_customers() -> list[dict]:
return [
{
......@@ -15,4 +20,4 @@ def get_customer_by_name(name: str) -> Optional[dict]:
if customer["name"] == name:
return customer
return None
raise CustomerNotFoundError(f"Customer {name} not found")
......@@ -8,4 +8,5 @@ LazyWorkflowInstance("gso.workflows.iptrunk.modify_trunk_interface", "modify_tru
LazyWorkflowInstance("gso.workflows.iptrunk.terminate_iptrunk", "terminate_iptrunk")
LazyWorkflowInstance("gso.workflows.iptrunk.modify_isis_metric", "modify_isis_metric")
LazyWorkflowInstance("gso.workflows.site.create_site", "create_site")
LazyWorkflowInstance("gso.workflows.tasks.import_site", "import_site")
\ No newline at end of file
LazyWorkflowInstance("gso.workflows.tasks.import_site", "import_site")
LazyWorkflowInstance("gso.workflows.tasks.import_router", "import_router")
import ipaddress
from typing import Optional
from orchestrator import workflow
from orchestrator.db import (
ProductTable,
SubscriptionTable,
SubscriptionInstanceValueTable,
SubscriptionInstanceTable,
ResourceTypeTable,
)
from orchestrator.domain.base import S
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr, SubscriptionLifecycle
from orchestrator.workflow import StepList, init, step, done
from orchestrator.workflows.steps import store_process_subscription, set_status, resync
from gso.products import Site
from gso.products.product_blocks import router as router_pb
from gso.products.product_blocks.router import RouterVendor, RouterRole
from gso.products.product_types import router
from gso.products.product_types.router import RouterInactive
from gso.services.crm import get_customer_by_name
@step("Create subscription")
def create_subscription(customer: str) -> State:
customer_id: UUIDstr = get_customer_by_name(customer)["id"]
product_id: UUIDstr = ProductTable.query.filter_by(name="Router").first().product_id
subscription = RouterInactive.from_product_id(product_id, customer_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
def initial_input_form_generator() -> FormGenerator:
class ImportRouter(FormPage):
class Config:
title = "Import Router"
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: Optional[bool] = None
router_access_via_ts: Optional[bool] = None
router_lo_ipv4_address: Optional[ipaddress.IPv4Address] = None
router_lo_ipv6_address: Optional[ipaddress.IPv6Address] = None
router_lo_iso_address: Optional[str] = None
router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None
user_input = yield ImportRouter
return user_input.dict()
def get_site_by_name(site_name: str) -> S:
subscription = (
SubscriptionTable.query.join(
ProductTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, ResourceTypeTable
)
.filter(SubscriptionInstanceValueTable.value == site_name)
.filter(ResourceTypeTable.resource_type == "site_name")
.filter(SubscriptionTable.status == "active")
.first()
)
if not subscription:
raise ValueError(f"Site with name {site_name} not found")
return Site.from_subscription(subscription.subscription_id)
@step("Initialize subscription")
def initialize_subscription(
subscription: RouterInactive,
hostname: str,
ts_port: int,
router_vendor: router_pb.RouterVendor,
router_site: str,
router_role: router_pb.RouterRole,
is_ias_connected: Optional[bool] = None,
router_lo_ipv4_address: Optional[ipaddress.IPv4Address] = None,
router_lo_ipv6_address: Optional[ipaddress.IPv6Address] = None,
router_lo_iso_address: Optional[str] = None,
router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None,
router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None,
router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None,
) -> State:
subscription.router.router_ts_port = ts_port
subscription.router.router_vendor = router_vendor
subscription.router.router_site = get_site_by_name(router_site).site
fqdn = (
f"{hostname}.{subscription.router.router_site.site_name.lower()}."
f"{subscription.router.router_site.site_country_code.lower()}"
".geant.net"
)
subscription.router.router_fqdn = fqdn
subscription.router.router_role = router_role
subscription.router.router_access_via_ts = True
subscription.description = f"Router {fqdn}"
subscription.router.router_is_ias_connected = is_ias_connected
subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
subscription.router.router_lo_iso_address = router_lo_iso_address
subscription.router.router_si_ipv4_network = router_si_ipv4_network
subscription.router.router_ias_lt_ipv4_network = router_ias_lt_ipv4_network
subscription.router.router_ias_lt_ipv6_network = router_ias_lt_ipv6_network
subscription = router.RouterProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@workflow(
"Import router",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def import_router() -> StepList:
return (
init
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment