from ipaddress import IPv4Network, IPv6Network from typing import Any # noinspection PyProtectedMember from orchestrator.forms import FormPage from orchestrator.forms.validators import Choice from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr from orchestrator.workflow import StepList, conditional, done, init, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form from pydantic import validator from gso.products.product_blocks.router import PortNumber, RouterRole, RouterVendor, generate_fqdn from gso.products.product_types.router import RouterInactive, RouterProvisioning from gso.products.product_types.site import Site from gso.services import infoblox, provisioning_proxy, subscriptions from gso.services.crm import customer_selector from gso.services.netbox_client import NetboxClient from gso.services.provisioning_proxy import pp_interaction from gso.utils.helpers import iso_from_ipv4 def _site_selector() -> Choice: site_subscriptions = {} for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"]): site_subscriptions[str(site["subscription_id"])] = site["description"] # noinspection PyTypeChecker return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items())) # type: ignore[arg-type] def initial_input_form_generator(product_name: str) -> FormGenerator: class CreateRouterForm(FormPage): class Config: title = product_name tt_number: str customer: customer_selector() # type: ignore[valid-type] router_site: _site_selector() # type: ignore[valid-type] hostname: str ts_port: PortNumber router_vendor: RouterVendor router_role: RouterRole is_ias_connected: bool | None = False @validator("hostname", allow_reuse=True) def hostname_must_be_available(cls, hostname: str, **kwargs: dict[str, Any]) -> str: router_site = kwargs["values"].get("router_site") if not router_site: raise ValueError("Please select a site before setting the hostname.") selected_site = Site.from_subscription(router_site).site input_fqdn = generate_fqdn(hostname, selected_site.site_name, selected_site.site_country_code) if not infoblox.hostname_available(f"lo0.{input_fqdn}"): raise ValueError(f'FQDN "{input_fqdn}" is not available.') return hostname user_input = yield CreateRouterForm return user_input.dict() @step("Create subscription") def create_subscription(product: UUIDstr, customer: UUIDstr) -> State: subscription = RouterInactive.from_product_id(product, customer) return { "subscription": subscription, "subscription_id": subscription.subscription_id, } @step("Initialize subscription") def initialize_subscription( subscription: RouterInactive, hostname: str, ts_port: PortNumber, router_vendor: RouterVendor, router_site: str, router_role: RouterRole, ) -> State: subscription.router.router_ts_port = ts_port subscription.router.router_vendor = router_vendor subscription.router.router_site = Site.from_subscription(router_site).site fqdn = generate_fqdn( hostname, subscription.router.router_site.site_name, subscription.router.router_site.site_country_code ) subscription.router.router_fqdn = fqdn subscription.router.router_role = router_role subscription.router.router_access_via_ts = True subscription.description = f"Router {fqdn}" subscription = RouterProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING) return {"subscription": subscription} @step("Allocate loopback interfaces in IPAM") def ipam_allocate_loopback(subscription: RouterProvisioning, is_ias_connected: bool) -> State: fqdn = subscription.router.router_fqdn loopback_v4, loopback_v6 = infoblox.allocate_host(f"lo0.{fqdn}", "LO", [fqdn], str(subscription.subscription_id)) subscription.router.router_lo_ipv4_address = loopback_v4 subscription.router.router_lo_ipv6_address = loopback_v6 subscription.router.router_lo_iso_address = iso_from_ipv4(subscription.router.router_lo_ipv4_address) subscription.router.router_is_ias_connected = is_ias_connected return {"subscription": subscription} @step("Allocate IAS connection in IPAM") def ipam_allocate_ias_networks(subscription: RouterProvisioning) -> State: fqdn = subscription.router.router_fqdn subscription.router.router_si_ipv4_network = infoblox.allocate_v4_network( "SI", f"SI for {fqdn} - {subscription.subscription_id}" ) subscription.router.router_ias_lt_ipv4_network = infoblox.allocate_v4_network( "LT_IAS", f"LT for {fqdn} - {subscription.subscription_id}" ) subscription.router.router_ias_lt_ipv6_network = infoblox.allocate_v6_network( "LT_IAS", f"LT for {fqdn} - {subscription.subscription_id}" ) return {"subscription": subscription} @step("Provision router [DRY RUN]") def provision_router_dry(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_router(subscription, process_id, tt_number) return { "subscription": subscription, "label_text": ( "Dry run for the deployment of base config on a new router. Deployment is done by the" " provisioning proxy, please wait for the results to come back before continuing." ), } @step("Provision router [FOR REAL]") def provision_router_real(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_router(subscription, process_id, tt_number, False) return { "subscription": subscription, "label_text": ( "Deployment of base config for a new router. Deployment is being taken care of by the" " provisioning proxy, please wait for the results to come back before continuing." ), } @step("Create NetBox Device") def create_netbox_device(subscription: RouterProvisioning) -> State: if subscription.router.router_vendor == RouterVendor.NOKIA: NetboxClient().create_device( subscription.router.router_fqdn, str(subscription.router.router_site.site_tier), # type: ignore[union-attr] ) return {"subscription": subscription, "label_text": "Creating NetBox device"} return {"subscription": subscription, "label_text": "Skipping NetBox device creation for Juniper router."} @step("Verify IPAM resources for loopback interface") def verify_ipam_loopback(subscription: RouterProvisioning) -> State: host_record = infoblox.find_host_by_fqdn(f"lo0.{subscription.router.router_fqdn}") if not host_record or str(subscription.subscription_id) not in host_record.comment: return {"ipam_warning": "Loopback record is incorrectly configured in IPAM, please investigate this manually!"} return {"subscription": subscription} @step("Verify IPAM resources for IAS/LT networks") def verify_ipam_ias(subscription: RouterProvisioning) -> State: si_ipv4_network = infoblox.find_network_by_cidr(IPv4Network(subscription.router.router_si_ipv4_network)) ias_lt_ipv4_network = infoblox.find_network_by_cidr(IPv4Network(subscription.router.router_ias_lt_ipv4_network)) ias_lt_ipv6_network = infoblox.find_network_by_cidr(IPv6Network(subscription.router.router_ias_lt_ipv6_network)) new_state = {} if not si_ipv4_network or str(subscription.subscription_id) not in si_ipv4_network.comment: new_state = { "ipam_si_warning": f"SI IPv4 network expected at {subscription.router.router_si_ipv4_network}, " f"but it was not found or misconfigured, please investigate and adjust if necessary." } if not ias_lt_ipv4_network or str(subscription.subscription_id) not in ias_lt_ipv4_network.comment: new_state = new_state | { "ipam_ias_lt_ipv4_warning": "IAS/LT IPv4 network expected at " f"{subscription.router.router_ias_lt_ipv4_network}, but it was not found or misconfigured, please " "investigate and adjust if necessary." } if not ias_lt_ipv6_network or str(subscription.subscription_id) not in ias_lt_ipv6_network.comment: new_state = new_state | { "ipam_ias_lt_ipv6_warning": f"IAS/LT IPv6 network expected at " f"{subscription.router.router_ias_lt_ipv6_network}, but it was not found or misconfigured, please " "investigate and adjust if necessary." } return new_state @workflow( "Create router", initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), target=Target.CREATE, ) def create_router() -> StepList: should_allocate_ias = conditional(lambda state: state["is_ias_connected"]) return ( init >> create_subscription >> store_process_subscription(Target.CREATE) >> initialize_subscription >> ipam_allocate_loopback >> should_allocate_ias(ipam_allocate_ias_networks) >> pp_interaction(provision_router_dry, 3) >> pp_interaction(provision_router_real, 3) >> verify_ipam_loopback >> should_allocate_ias(verify_ipam_ias) >> create_netbox_device >> set_status(SubscriptionLifecycle.ACTIVE) >> resync >> done )