Skip to content
Snippets Groups Projects
Commit 29d1d777 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

added iptrunk import api

parent a1228219
Branches
Tags
1 merge request!63added iptrunk import api
Pipeline #83949 failed
Showing
with 388 additions and 123 deletions
...@@ -11,7 +11,7 @@ run-tox-pipeline: ...@@ -11,7 +11,7 @@ run-tox-pipeline:
stage: tox stage: tox
tags: tags:
- docker-executor - docker-executor
image: python:3.10 image: python:3.11
services: services:
- postgres:15.4 - postgres:15.4
......
from fastapi import APIRouter
from gso.api.v1 import router as router_v1
router = APIRouter()
router.include_router(router_v1, prefix="/v1")
"""Module that implements process related API endpoints."""
from fastapi.param_functions import Depends
from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default
from gso.api.api_v1.endpoints import imports
api_router = APIRouter()
api_router.include_router(imports.router, prefix="/imports", dependencies=[Depends(opa_security_default)])
from fastapi import APIRouter
from gso.api.v1.imports import router as imports_router
router = APIRouter()
router.include_router(imports_router)
import ipaddress from typing import Any, Dict
from typing import Any, Dict, Optional
from uuid import UUID from uuid import UUID
from fastapi import HTTPException, status from fastapi import Depends, HTTPException, status
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default
from orchestrator.services import processes, subscriptions from orchestrator.services import processes, subscriptions
from pydantic import BaseModel
from sqlalchemy.exc import MultipleResultsFound from sqlalchemy.exc import MultipleResultsFound
from gso.products.product_blocks.router import RouterRole, RouterVendor from gso.schemas.imports import ImportResponseModel, IptrunkImportModel, RouterImportModel, SiteImportModel
from gso.products.product_blocks.site import SiteTier
router = APIRouter() router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
def start_process(process_name: str, data: dict) -> UUID: def _start_process(process_name: str, data: dict) -> UUID:
"""Start a process and handle common exceptions.""" """Start a process and handle common exceptions."""
pid: UUID = processes.start_process(process_name, [data]) pid: UUID = processes.start_process(process_name, [data])
...@@ -31,27 +29,13 @@ def start_process(process_name: str, data: dict) -> UUID: ...@@ -31,27 +29,13 @@ def start_process(process_name: str, data: dict) -> UUID:
return pid return pid
class SiteImport(BaseModel): @router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
site_name: str def import_site(site: SiteImportModel) -> Dict[str, Any]:
site_city: str """Import a site by running the import_site workflow.
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
customer: str
@router.post("/sites", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_site(site: SiteImport) -> Dict[str, Any]:
"""Import site by running the import_site workflow.
Args: Args:
---- ----
site (SiteImport): The site information to be imported. site (SiteImportModel): The site information to be imported.
Returns: Returns:
------- -------
...@@ -70,27 +54,11 @@ def import_site(site: SiteImport) -> Dict[str, Any]: ...@@ -70,27 +54,11 @@ def import_site(site: SiteImport) -> Dict[str, Any]:
except MultipleResultsFound: except MultipleResultsFound:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.") raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.")
pid = start_process("import_site", site.dict()) pid = _start_process("import_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid} return {"detail": "Site added successfully.", "pid": pid}
class RouterImportModel(BaseModel): @router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None
@router.post("/routers", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_router(router_data: RouterImportModel) -> Dict[str, Any]: def import_router(router_data: RouterImportModel) -> Dict[str, Any]:
"""Import a router by running the import_router workflow. """Import a router by running the import_router workflow.
...@@ -107,5 +75,26 @@ def import_router(router_data: RouterImportModel) -> Dict[str, Any]: ...@@ -107,5 +75,26 @@ def import_router(router_data: RouterImportModel) -> Dict[str, Any]:
HTTPException: If there's an error in the process. HTTPException: If there's an error in the process.
""" """
pid = start_process("import_router", router_data.dict()) pid = _start_process("import_router", router_data.dict())
return {"detail": "Router added successfully", "pid": pid} return {"detail": "Router added successfully", "pid": pid}
@router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_iptrunk(iptrunk_data: IptrunkImportModel) -> Dict[str, Any]:
"""Import an iptrunk by running the import_iptrunk workflow.
Args:
----
iptrunk_data (IptrunkImportModel): The iptrunk information to be imported.
Returns:
-------
dict: A dictionary containing the process id of the started process and detail message.
Raises:
------
HTTPException: If there's an error in the process.
"""
pid = _start_process("import_iptrunk", iptrunk_data.dict())
return {"detail": "Iptrunk added successfully", "pid": pid}
...@@ -7,7 +7,7 @@ from orchestrator.settings import AppSettings ...@@ -7,7 +7,7 @@ from orchestrator.settings import AppSettings
import gso.products # noqa: F401 import gso.products # noqa: F401
import gso.workflows # noqa: F401 import gso.workflows # noqa: F401
from gso import load_gso_cli from gso import load_gso_cli
from gso.api.api_v1.api import api_router from gso.api import router as api_router
def init_gso_app(settings: AppSettings) -> OrchestratorCore: def init_gso_app(settings: AppSettings) -> OrchestratorCore:
......
...@@ -12,3 +12,5 @@ SUBSCRIPTION_MODEL_REGISTRY.update( ...@@ -12,3 +12,5 @@ SUBSCRIPTION_MODEL_REGISTRY.update(
"IP trunk": Iptrunk, "IP trunk": Iptrunk,
} }
) )
__all__ = ["Site", "Iptrunk", "Router"]
...@@ -3,20 +3,16 @@ ...@@ -3,20 +3,16 @@
In this file, some enumerators may be declared that are available for use across all subscriptions. In this file, some enumerators may be declared that are available for use across all subscriptions.
""" """
from enum import Enum from enum import StrEnum
class PhyPortCapacity(Enum): class PhyPortCapacity(StrEnum):
"""Physical port capacity enumerator. """Physical port capacity enumerator.
An enumerator that has the different possible capacities of ports that are available to use in subscriptions. An enumerator that has the different possible capacities of ports that are available to use in subscriptions.
""" """
ONE = "1G" ONE_GIGABIT_PER_SECOND = "1G"
"""1Gbps""" TEN_GIGABIT_PER_SECOND = "10G"
TEN = "10G" HUNDRED_GIGABIT_PER_SECOND = "100G"
"""10Gbps""" FOUR_HUNDRED_GIGABIT_PER_SECOND = "400G"
HUNDRED = "100G"
"""100Gbps"""
FOUR_HUNDRED = "400G"
"""400Gbps"""
from uuid import UUID
from asyncio_redis import Subscription
from orchestrator.db import (
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
)
from gso.schemas.enums import ProductType, SubscriptionStatus
def all_active_subscriptions(
product_type: str,
fields: list[str],
) -> list[Subscription]:
dynamic_fields = [getattr(SubscriptionTable, field) for field in fields]
return (
SubscriptionTable.query.join(ProductTable)
.filter(
ProductTable.product_type == product_type,
SubscriptionTable.status == SubscriptionStatus.ACTIVE,
)
.with_entities(*dynamic_fields)
.all()
)
def all_active_site_subscriptions(fields: list[str]) -> list[Subscription]:
return all_active_subscriptions(ProductType.SITE, fields)
def site_product_id() -> UUID:
return ProductTable.query.filter_by(name=ProductType.SITE).first().product_id
def active_site_subscription_by_name(site_name: str) -> Subscription:
return (
SubscriptionTable.query.join(
ProductTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, ResourceTypeTable
)
.filter(SubscriptionInstanceValueTable.value == site_name)
.filter(ResourceTypeTable.resource_type == "site_name")
.filter(SubscriptionTable.status == SubscriptionStatus.ACTIVE)
.first()
)
def iptrunk_product_id() -> UUID:
return ProductTable.query.filter_by(name=ProductType.IP_TRUNK).first().product_id
def all_active_router_subscriptions(fields: list[str]) -> list[Subscription]:
return all_active_subscriptions(product_type=ProductType.ROUTER, fields=fields)
def router_product_id() -> UUID:
return ProductTable.query.filter_by(name=ProductType.ROUTER).first().product_id
File moved
from enum import StrEnum
class ProductType(StrEnum):
SITE = "Site"
ROUTER = "Router"
IP_TRUNK = "IP trunk"
class SubscriptionStatus(StrEnum):
ACTIVE = "active"
import ipaddress
from typing import Any
from uuid import UUID
from pydantic import BaseModel, root_validator, validator
from gso import repository
from gso.products.product_blocks import PhyPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier
from gso.services.crm import CustomerNotFoundError, get_customer_id_by_name
class ImportResponseModel(BaseModel):
pid: UUID
detail: str
class SiteImportModel(BaseModel):
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
customer: str
class RouterImportModel(BaseModel):
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_si_ipv4_network: ipaddress.IPv4Network | None = None
router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None
router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None
class IptrunkImportModel(BaseModel):
customer: str
geant_s_sid: str
iptrunk_type: IptrunkType
iptrunk_description: str
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
iptrunk_sideA_node_id: str
iptrunk_sideA_ae_iface: str
iptrunk_sideA_ae_geant_a_sid: str
iptrunk_sideA_ae_members: list[str]
iptrunk_sideA_ae_members_descriptions: list[str]
iptrunk_sideB_node_id: str
iptrunk_sideB_ae_iface: str
iptrunk_sideB_ae_geant_a_sid: str
iptrunk_sideB_ae_members: list[str]
iptrunk_sideB_ae_members_descriptions: list[str]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
@classmethod
def _get_active_routers(cls) -> set[str]:
return {str(router_id) for router_id in repository.all_active_router_subscriptions(fields=["subscription_id"])}
@validator("customer")
def check_if_customer_exists(cls, value: str) -> str:
try:
get_customer_id_by_name(value)
except CustomerNotFoundError:
raise ValueError(f"Customer {value} not found")
return value
@validator("iptrunk_sideA_node_id", "iptrunk_sideB_node_id")
def check_if_router_side_is_available(cls, value: str) -> str:
if value not in cls._get_active_routers():
raise ValueError("Router not found")
return value
@validator("iptrunk_sideA_ae_members", "iptrunk_sideB_ae_members")
def check_side_uniqueness(cls, value: list[str]) -> list[str]:
if len(value) != len(set(value)):
raise ValueError("Items must be unique")
return value
@root_validator
def check_members(cls, values: dict[str, Any]) -> dict[str, Any]:
min_links = values["iptrunk_minimum_links"]
side_a_members = values.get("iptrunk_sideA_ae_members", [])
side_a_descriptions = values.get("iptrunk_sideA_ae_members_descriptions", [])
side_b_members = values.get("iptrunk_sideB_ae_members", [])
side_b_descriptions = values.get("iptrunk_sideB_ae_members_descriptions", [])
len_a = len(side_a_members)
len_a_desc = len(side_a_descriptions)
len_b = len(side_b_members)
len_b_desc = len(side_b_descriptions)
if len_a < min_links:
raise ValueError(f"Side A members should be at least {min_links} (iptrunk_minimum_links)")
if len_a != len_a_desc:
raise ValueError("Mismatch in Side A members and their descriptions")
if len_a != len_b:
raise ValueError("Mismatch between Side A and B members")
if len_a != len_b_desc:
raise ValueError("Mismatch in Side B members and their descriptions")
return values
...@@ -11,7 +11,7 @@ def all_customers() -> list[dict]: ...@@ -11,7 +11,7 @@ def all_customers() -> list[dict]:
return [ return [
{ {
"id": "8f0df561-ce9d-4d9c-89a8-7953d3ffc961", "id": "8f0df561-ce9d-4d9c-89a8-7953d3ffc961",
"name": "Geant", "name": "GÉANT",
}, },
] ]
...@@ -22,3 +22,7 @@ def get_customer_by_name(name: str) -> Dict[str, Any]: ...@@ -22,3 +22,7 @@ def get_customer_by_name(name: str) -> Dict[str, Any]:
return customer return customer
raise CustomerNotFoundError(f"Customer {name} not found") raise CustomerNotFoundError(f"Customer {name} not found")
def get_customer_id_by_name(name: str) -> str:
return get_customer_by_name(name)["id"]
from orchestrator.db import (
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
)
from gso.products.product_types.site import Site
def get_site_by_name(site_name: str) -> Site:
"""Get a site by its name.
Args:
----
site_name (str): The name of the site.
"""
subscription = (
SubscriptionTable.query.join(
ProductTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, ResourceTypeTable
)
.filter(SubscriptionInstanceValueTable.value == site_name)
.filter(ResourceTypeTable.resource_type == "site_name")
.filter(SubscriptionTable.status == "active")
.first()
)
if not subscription:
raise ValueError(f"Site with name {site_name} not found.")
return Site.from_subscription(subscription.subscription_id)
...@@ -10,3 +10,4 @@ LazyWorkflowInstance("gso.workflows.iptrunk.modify_isis_metric", "modify_isis_me ...@@ -10,3 +10,4 @@ LazyWorkflowInstance("gso.workflows.iptrunk.modify_isis_metric", "modify_isis_me
LazyWorkflowInstance("gso.workflows.site.create_site", "create_site") LazyWorkflowInstance("gso.workflows.site.create_site", "create_site")
LazyWorkflowInstance("gso.workflows.tasks.import_site", "import_site") LazyWorkflowInstance("gso.workflows.tasks.import_site", "import_site")
LazyWorkflowInstance("gso.workflows.tasks.import_router", "import_router") LazyWorkflowInstance("gso.workflows.tasks.import_router", "import_router")
LazyWorkflowInstance("gso.workflows.tasks.import_iptrunk", "import_iptrunk")
from orchestrator.db.models import ProductTable, SubscriptionTable
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, UniqueConstrainedList from orchestrator.forms.validators import Choice, UniqueConstrainedList
from orchestrator.targets import Target from orchestrator.targets import Target
...@@ -7,6 +6,7 @@ from orchestrator.workflow import StepList, done, init, step, workflow ...@@ -7,6 +6,7 @@ from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form from orchestrator.workflows.utils import wrap_create_initial_input_form
from gso import repository
from gso.products.product_blocks import PhyPortCapacity from gso.products.product_blocks import PhyPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
...@@ -21,14 +21,8 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: ...@@ -21,14 +21,8 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
# * interface names must be validated # * interface names must be validated
routers = {} routers = {}
for router_id, router_description in ( for router_id, router_description in repository.all_active_router_subscriptions(
SubscriptionTable.query.join(ProductTable) fields=["subscription_id", "description"]
.filter(
ProductTable.product_type == "Router",
SubscriptionTable.status == "active",
)
.with_entities(SubscriptionTable.subscription_id, SubscriptionTable.description)
.all()
): ):
routers[str(router_id)] = router_description routers[str(router_id)] = router_description
......
import ipaddress import ipaddress
import re import re
from orchestrator.db.models import ProductTable, SubscriptionTable
# noinspection PyProtectedMember # noinspection PyProtectedMember
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice from orchestrator.forms.validators import Choice
...@@ -12,6 +10,7 @@ from orchestrator.workflow import StepList, done, init, step, workflow ...@@ -12,6 +10,7 @@ from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form from orchestrator.workflows.utils import wrap_create_initial_input_form
from gso import repository
from gso.products.product_blocks import router as router_pb from gso.products.product_blocks import router as router_pb
from gso.products.product_types import router from gso.products.product_types import router
from gso.products.product_types.router import RouterInactive, RouterProvisioning from gso.products.product_types.router import RouterInactive, RouterProvisioning
...@@ -23,14 +22,8 @@ from gso.workflows.utils import customer_selector ...@@ -23,14 +22,8 @@ from gso.workflows.utils import customer_selector
def site_selector() -> Choice: def site_selector() -> Choice:
site_subscriptions = {} site_subscriptions = {}
for site_id, site_description in ( for site_id, site_description in repository.all_active_site_subscriptions(
SubscriptionTable.query.join(ProductTable) fields=["subscription_id", "description"]
.filter(
ProductTable.product_type == "Site",
SubscriptionTable.status == "active",
)
.with_entities(SubscriptionTable.subscription_id, SubscriptionTable.description)
.all()
): ):
site_subscriptions[str(site_id)] = site_description site_subscriptions[str(site_id)] = site_description
......
import ipaddress
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, UniqueConstrainedList
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, done, init, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from gso import repository
from gso.products.product_blocks import PhyPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.services.crm import get_customer_id_by_name
from gso.workflows.iptrunk.create_iptrunk import initialize_subscription
def _generate_routers() -> dict[str, str]:
"""Generate a dictionary of router IDs and descriptions."""
routers = {}
for router_id, router_description in repository.all_active_router_subscriptions(
fields=["subscription_id", "description"]
):
routers[str(router_id)] = router_description
return routers
def initial_input_form_generator() -> FormGenerator:
routers = _generate_routers()
RouterEnum = Choice("Select a router", zip(routers.keys(), routers.items())) # type: ignore
class CreateIptrunkForm(FormPage):
class Config:
title = "Import Iptrunk"
customer: str
geant_s_sid: str
iptrunk_description: str
iptrunk_type: IptrunkType
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
iptrunk_sideA_node_id: RouterEnum # type: ignore
iptrunk_sideA_ae_iface: str
iptrunk_sideA_ae_geant_a_sid: str
iptrunk_sideA_ae_members: UniqueConstrainedList[str]
iptrunk_sideA_ae_members_descriptions: UniqueConstrainedList[str]
iptrunk_sideB_node_id: RouterEnum # type: ignore
iptrunk_sideB_ae_iface: str
iptrunk_sideB_ae_geant_a_sid: str
iptrunk_sideB_ae_members: UniqueConstrainedList[str]
iptrunk_sideB_ae_members_descriptions: UniqueConstrainedList[str]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
initial_user_input = yield CreateIptrunkForm
return initial_user_input.dict()
@step("Create a new subscription")
def create_subscription(customer: str) -> State:
customer_id = get_customer_id_by_name(customer)
product_id = repository.iptrunk_product_id()
subscription = IptrunkInactive.from_product_id(product_id, customer_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
@step("Update IPAM Stub for Subscription")
def update_ipam_stub_for_subscription(
subscription: IptrunkProvisioning,
iptrunk_ipv4_network: ipaddress.IPv4Network,
iptrunk_ipv6_network: ipaddress.IPv6Network,
) -> State:
subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
return {"subscription": subscription}
@workflow(
"Import iptrunk",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def import_iptrunk() -> StepList:
return (
init
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> update_ipam_stub_for_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
...@@ -3,25 +3,39 @@ from typing import Optional ...@@ -3,25 +3,39 @@ from typing import Optional
from uuid import UUID from uuid import UUID
from orchestrator import workflow from orchestrator import workflow
from orchestrator.db import ProductTable
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, done, init, step from orchestrator.workflow import StepList, done, init, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from gso import repository
from gso.products.product_blocks import router as router_pb from gso.products.product_blocks import router as router_pb
from gso.products.product_blocks.router import RouterRole, RouterVendor from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_types import router from gso.products.product_types import router
from gso.products.product_types.router import RouterInactive from gso.products.product_types.router import RouterInactive
from gso.services.crm import get_customer_by_name from gso.products.product_types.site import Site
from gso.services.subscriptions import get_site_by_name from gso.services.crm import get_customer_id_by_name
def _get_site_by_name(site_name: str) -> Site:
"""Get a site by its name.
Args:
----
site_name (str): The name of the site.
"""
subscription = repository.active_site_subscription_by_name(site_name)
if not subscription:
raise ValueError(f"Site with name {site_name} not found.")
return Site.from_subscription(subscription.subscription_id)
@step("Create subscription") @step("Create subscription")
def create_subscription(customer: str) -> State: def create_subscription(customer: str) -> State:
customer_id: UUID = get_customer_by_name(customer)["id"] customer_id: str = get_customer_id_by_name(customer)
product_id: UUID = ProductTable.query.filter_by(name="Router").first().product_id product_id: UUID = repository.router_product_id()
subscription = RouterInactive.from_product_id(product_id, customer_id) subscription = RouterInactive.from_product_id(product_id, customer_id)
return { return {
...@@ -72,7 +86,7 @@ def initialize_subscription( ...@@ -72,7 +86,7 @@ def initialize_subscription(
) -> State: ) -> State:
subscription.router.router_ts_port = ts_port subscription.router.router_ts_port = ts_port
subscription.router.router_vendor = router_vendor subscription.router.router_vendor = router_vendor
subscription.router.router_site = get_site_by_name(router_site).site subscription.router.router_site = _get_site_by_name(router_site).site
fqdn = ( fqdn = (
f"{hostname}.{subscription.router.router_site.site_name.lower()}." f"{hostname}.{subscription.router.router_site.site_name.lower()}."
f"{subscription.router.router_site.site_country_code.lower()}" f"{subscription.router.router_site.site_country_code.lower()}"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment