from ipaddress import IPv4Network, IPv6Network from typing import Any # noinspection PyProtectedMember from orchestrator.forms import FormPage from orchestrator.forms.validators import Choice from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr from orchestrator.workflow import StepList, conditional, done, init, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form from pydantic import validator from gso.products.product_blocks.router import RouterRole, RouterVendor, generate_fqdn from gso.products.product_types.router import RouterInactive, RouterProvisioning from gso.products.product_types.site import Site from gso.products.shared import PortNumber from gso.services import infoblox, provisioning_proxy, subscriptions from gso.services.provisioning_proxy import pp_interaction from gso.workflows.utils import customer_selector, iso_from_ipv4 def _site_selector() -> Choice: site_subscriptions = {} for site_id, site_description in subscriptions.get_active_site_subscriptions( fields=["subscription_id", "description"] ): site_subscriptions[str(site_id)] = site_description # noinspection PyTypeChecker return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items())) # type: ignore def initial_input_form_generator(product_name: str) -> FormGenerator: class CreateRouterForm(FormPage): class Config: title = product_name tt_number: str customer: customer_selector() # type: ignore router_site: _site_selector() # type: ignore hostname: str ts_port: PortNumber router_vendor: RouterVendor router_role: RouterRole is_ias_connected: bool | None = False @validator("hostname", allow_reuse=True) def hostname_must_be_available(cls, hostname: str, **kwargs: dict[str, Any]) -> str: router_site = kwargs["values"].get("router_site") if not router_site: raise ValueError("Please select a site before setting the hostname.") selected_site = Site.from_subscription(router_site).site input_fqdn = generate_fqdn(hostname, selected_site.site_name, selected_site.site_country_code) if not infoblox.hostname_available(f"lo0.{input_fqdn}"): raise ValueError(f'FQDN "{input_fqdn}" is not available.') return hostname user_input = yield CreateRouterForm return user_input.dict() @step("Create subscription") def create_subscription(product: UUIDstr, customer: UUIDstr) -> State: subscription = RouterInactive.from_product_id(product, customer) return { "subscription": subscription, "subscription_id": subscription.subscription_id, } @step("Initialize subscription") def initialize_subscription( subscription: RouterInactive, hostname: str, ts_port: PortNumber, router_vendor: RouterVendor, router_site: str, router_role: RouterRole, ) -> State: subscription.router.router_ts_port = ts_port subscription.router.router_vendor = router_vendor subscription.router.router_site = Site.from_subscription(router_site).site fqdn = generate_fqdn( hostname, subscription.router.router_site.site_name, subscription.router.router_site.site_country_code ) subscription.router.router_fqdn = fqdn subscription.router.router_role = router_role subscription.router.router_access_via_ts = True subscription.description = f"Router {fqdn}" subscription = RouterProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING) return {"subscription": subscription} @step("Allocate loopback interfaces in IPAM") def ipam_allocate_loopback(subscription: RouterProvisioning, is_ias_connected: bool) -> State: fqdn = subscription.router.router_fqdn loopback_v4, loopback_v6 = infoblox.allocate_host(f"lo0.{fqdn}", "LO", [fqdn], str(subscription.subscription_id)) subscription.router.router_lo_ipv4_address = loopback_v4 subscription.router.router_lo_ipv6_address = loopback_v6 subscription.router.router_lo_iso_address = iso_from_ipv4(subscription.router.router_lo_ipv4_address) subscription.router.router_is_ias_connected = is_ias_connected return {"subscription": subscription} @step("Allocate IAS connection in IPAM") def ipam_allocate_ias_networks(subscription: RouterProvisioning) -> State: fqdn = subscription.router.router_fqdn subscription.router.router_si_ipv4_network = infoblox.allocate_v4_network( "SI", f"SI for {fqdn} - {subscription.subscription_id}" ) subscription.router.router_ias_lt_ipv4_network = infoblox.allocate_v4_network( "LT_IAS", f"LT for {fqdn} - {subscription.subscription_id}" ) subscription.router.router_ias_lt_ipv6_network = infoblox.allocate_v6_network( "LT_IAS", f"LT for {fqdn} - {subscription.subscription_id}" ) return {"subscription": subscription} @step("Provision router [DRY RUN]") def provision_router_dry(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_router(subscription, process_id, tt_number) return { "subscription": subscription, "label_text": ( "Dry run for the deployment of base config on a new router. Deployment is done by the" " provisioning proxy, please wait for the results to come back before continuing." ), } @step("Provision router [FOR REAL]") def provision_router_real(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_router(subscription, process_id, tt_number, False) return { "subscription": subscription, "label_text": ( "Deployment of base config for a new router. Deployment is being taken care of by the" " provisioning proxy, please wait for the results to come back before continuing." ), } @step("Verify IPAM resources for loopback interface") def verify_ipam_loopback(subscription: RouterProvisioning) -> State: host_record = infoblox.find_host_by_fqdn(f"lo0.{subscription.router.router_fqdn}") if not host_record or str(subscription.subscription_id) not in host_record.comment: return {"ipam_warning": "Loopback record is incorrectly configured in IPAM, please investigate this manually!"} return {"subscription": subscription} @step("Verify IPAM resources for IAS/LT networks") def verify_ipam_ias(subscription: RouterProvisioning) -> State: si_ipv4_network = infoblox.find_network_by_cidr(IPv4Network(subscription.router.router_si_ipv4_network)) ias_lt_ipv4_network = infoblox.find_network_by_cidr(IPv4Network(subscription.router.router_ias_lt_ipv4_network)) ias_lt_ipv6_network = infoblox.find_network_by_cidr(IPv6Network(subscription.router.router_ias_lt_ipv6_network)) new_state = {} if not si_ipv4_network or str(subscription.subscription_id) not in si_ipv4_network.comment: new_state = { "ipam_si_warning": f"SI IPv4 network expected at {subscription.router.router_si_ipv4_network}, " f"but it was not found or misconfigured, please investigate and adjust if necessary." } if not ias_lt_ipv4_network or str(subscription.subscription_id) not in ias_lt_ipv4_network.comment: new_state = new_state | { "ipam_ias_lt_ipv4_warning": "IAS/LT IPv4 network expected at " f"{subscription.router.router_ias_lt_ipv4_network}, but it was not found or misconfigured, please " "investigate and adjust if necessary." } if not ias_lt_ipv6_network or str(subscription.subscription_id) not in ias_lt_ipv6_network.comment: new_state = new_state | { "ipam_ias_lt_ipv6_warning": f"IAS/LT IPv6 network expected at " f"{subscription.router.router_ias_lt_ipv6_network}, but it was not found or misconfigured, please " "investigate and adjust if necessary." } return new_state @workflow( "Create router", initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), target=Target.CREATE, ) def create_router() -> StepList: should_allocate_ias = conditional(lambda state: state["is_ias_connected"]) return ( init >> create_subscription >> store_process_subscription(Target.CREATE) >> initialize_subscription >> ipam_allocate_loopback >> should_allocate_ias(ipam_allocate_ias_networks) >> pp_interaction(provision_router_dry, 3) >> pp_interaction(provision_router_real, 3) >> verify_ipam_loopback >> should_allocate_ias(verify_ipam_ias) >> set_status(SubscriptionLifecycle.ACTIVE) >> resync >> done )