Skip to content
Snippets Groups Projects
Commit 3adb6022 authored by Neda Moeini's avatar Neda Moeini
Browse files

Added imported opengear product and workflow- Partially.

parent b5ad6acc
No related branches found
No related tags found
1 merge request!225Feature/NAT 459 - Opengear import workflow
Pipeline #87379 failed
......@@ -11,7 +11,7 @@ from pydantic_forms.types import strEnum
from gso.products.product_types.iptrunk import ImportedIptrunk, Iptrunk
from gso.products.product_types.lan_switch_interconnect import LanSwitchInterconnect
from gso.products.product_types.office_router import ImportedOfficeRouter, OfficeRouter
from gso.products.product_types.opengear import Opengear
from gso.products.product_types.opengear import Opengear, ImportedOpengear
from gso.products.product_types.pop_vlan import PopVlan
from gso.products.product_types.router import ImportedRouter, Router
from gso.products.product_types.site import ImportedSite, Site
......@@ -36,6 +36,7 @@ class ProductName(strEnum):
IMPORTED_SUPER_POP_SWITCH = "Imported super PoP switch"
IMPORTED_OFFICE_ROUTER = "Imported office router"
OPENGEAR = "Opengear"
IMPORTED_OPENGEAR = "Imported Opengear"
class ProductType(strEnum):
......@@ -55,6 +56,7 @@ class ProductType(strEnum):
IMPORTED_SUPER_POP_SWITCH = ImportedSuperPopSwitch.__name__
IMPORTED_OFFICE_ROUTER = ImportedOfficeRouter.__name__
OPENGEAR = Opengear.__name__
IMPORTED_OPENGEAR = Opengear.__name__
SUBSCRIPTION_MODEL_REGISTRY.update(
......@@ -73,5 +75,6 @@ SUBSCRIPTION_MODEL_REGISTRY.update(
ProductName.IMPORTED_SUPER_POP_SWITCH.value: ImportedSuperPopSwitch,
ProductName.IMPORTED_OFFICE_ROUTER.value: ImportedOfficeRouter,
ProductName.OPENGEAR.value: Opengear,
ProductName.IMPORTED_OPENGEAR.value: ImportedOpengear,
},
)
......@@ -22,3 +22,17 @@ class Opengear(OpengearProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
"""An Opengear that is currently active."""
opengear: OpengearBlock
class ImportedOpengearInactive(SubscriptionModel, is_base=True):
"""An imported, inactive Opengear."""
opengear: OpengearBlockInactive
class ImportedOpengear(
ImportedOpengearInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE]
):
"""An imported Opengear that is currently active."""
opengear: OpengearBlock
\ No newline at end of file
"""A creation workflow that adds an existing opengear to the service database."""
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, done, init, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from pydantic import ConfigDict
from gso.products import ProductName
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import ImportedRouterInactive
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import get_product_id_by_name, get_site_by_name
from gso.utils.helpers import generate_fqdn
from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, PortNumber, Vendor
@step("Create subscription")
def create_subscription(partner: str) -> State:
"""Create a new subscription object."""
partner_id = get_partner_by_name(partner)["partner_id"]
product_id = get_product_id_by_name(ProductName.IMPORTED_OPENGEAR)
subscription = ImportedRouterInactive.from_product_id(product_id, partner_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
def initial_input_form_generator() -> FormGenerator:
"""Generate a form that is filled in using information passed through the :term:`API` endpoint."""
class ImportRouter(FormPage):
model_config = ConfigDict(title="Import Router")
partner: str
router_site: str
hostname: str
ts_port: int
router_vendor: Vendor
router_role: RouterRole
router_lo_ipv4_address: IPv4AddressType
router_lo_ipv6_address: IPv6AddressType
router_lo_iso_address: str
user_input = yield ImportRouter
return user_input.dict()
@step("Initialize subscription")
def initialize_subscription(
subscription: ImportedRouterInactive,
hostname: str,
ts_port: PortNumber,
router_site: str,
router_role: RouterRole,
router_vendor: Vendor,
router_lo_ipv4_address: IPv4AddressType | None = None,
router_lo_ipv6_address: IPv6AddressType | None = None,
router_lo_iso_address: str | None = None,
) -> State:
"""Initialise the router subscription using input data."""
subscription.router.router_ts_port = ts_port
router_site_obj = get_site_by_name(router_site).site
subscription.router.router_site = router_site_obj
fqdn = generate_fqdn(hostname, router_site_obj.site_name, router_site_obj.site_country_code)
subscription.router.router_fqdn = fqdn
subscription.router.router_role = router_role
subscription.router.router_access_via_ts = False
subscription.description = f"Router {fqdn}"
subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
subscription.router.router_lo_iso_address = router_lo_iso_address
subscription.router.vendor = router_vendor
return {"subscription": subscription}
@workflow(
"Import router",
initial_input_form=initial_input_form_generator,
target=Target.CREATE,
)
def create_imported_router() -> StepList:
"""Import a router without provisioning it."""
return (
init
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""A modification workflow for setting a new :term:`ISIS` metric for an IP trunk."""
from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products import ProductName
from gso.products.product_types.router import ImportedRouter, Router
from gso.services.subscriptions import get_product_id_by_name
@step("Create new router subscription")
def import_router_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedRouter subscription, and turn it into a Router subscription."""
old_router = ImportedRouter.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.ROUTER)
new_subscription = Router.from_other_product(old_router, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
@workflow("Import Router", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_router() -> StepList:
"""Modify an ImportedRouter subscription into a Router subscription to complete the import."""
return init >> store_process_subscription(Target.MODIFY) >> unsync >> import_router_subscription >> resync >> done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment