-
Karel van Klink authoredKarel van Klink authored
terminate_router.py 4.04 KiB
import ipaddress
import logging
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.router import Router
from gso.services import ipam
from gso.services.ipam import HostAddresses, V4ServiceNetwork, V6ServiceNetwork
logger = logging.getLogger(__name__)
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
subscription = Router.from_subscription(subscription_id)
class TerminateForm(FormPage):
termination_label: Label = (
"Please confirm whether configuration should get removed from the router, and whether IPAM resources should"
" be released." # type: ignore
)
remove_configuration: bool = True
clean_up_ipam: bool = True
user_input = yield TerminateForm
return user_input.dict()
@step("Deprovision loopback IPs from IPAM/DNS")
def deprovision_loopback_ips(subscription: Router) -> dict[str, HostAddresses]:
input_host_addresses = ipam.HostAddresses(
v4=ipaddress.IPv4Address(subscription.router.router_lo_ipv4_address),
v6=ipaddress.IPv6Address(subscription.router.router_lo_ipv6_address),
)
fqdn_as_list = subscription.router.router_fqdn.split(".")
hostname = str(fqdn_as_list[0]) + "." + str(fqdn_as_list[1]) + "." + str(fqdn_as_list[2])
lo0_name = "lo0." + hostname
host_addresses = ipam.delete_host(
hostname=lo0_name,
host_addresses=input_host_addresses,
cname_aliases=[hostname],
service_type="LO",
)
return {"addresses": host_addresses}
@step("Deprovision SI- interface IPs from IPAM/DNS")
def deprovision_si_ips(subscription: Router) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network = ipam.delete_network(
network=ipaddress.ip_network(subscription.router.router_si_ipv4_network), # type: ignore
service_type="SI",
)
return {"service_network": service_network}
@step("Deprovision LT- interface (IAS) IPs from IPAM/DNS")
def deprovision_lt_ips(subscription: Router) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network_v4 = ipam.delete_network(
network=ipaddress.ip_network(subscription.router.router_ias_lt_ipv4_network), # type: ignore
service_type="LT_IAS",
)
service_network_v6 = ipam.delete_network(
network=ipaddress.ip_network(subscription.router.router_ias_lt_ipv6_network), # type: ignore
service_type="LT_IAS",
)
return {
"service_network_v4": service_network_v4,
"service_network_v6": service_network_v6,
}
@step("Remove configuration from router")
def remove_config_from_router() -> None:
pass
@workflow(
"Terminate router",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE,
)
def terminate_router() -> StepList:
run_ipam_steps = conditional(lambda state: state.get("clean_up_ipam", True))
run_config_steps = conditional(lambda state: state.get("remove_configuration", True))
run_ias_removal = conditional(lambda subscription: "router_ias_lt_ipv4_network" in subscription.get("router", {}))
run_si_removal = conditional(lambda subscription: "router_si_ipv4_network" in subscription.get("router", {}))
ipam_steps = (
StepList([deprovision_loopback_ips])
>> run_si_removal(deprovision_si_ips)
>> run_ias_removal(deprovision_lt_ips)
)
return (
init
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> run_ipam_steps(ipam_steps)
>> run_config_steps(remove_config_from_router)
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)