-
Karel van Klink authoredKarel van Klink authored
terminate_router.py 3.25 KiB
"""A workflow that terminates a router."""
import ipaddress
import logging
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflows.steps import (
resync,
set_status,
store_process_subscription,
unsync,
)
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.router import Router, RouterVendor
from gso.services import infoblox
from gso.services.netbox_client import NetboxClient
logger = logging.getLogger(__name__)
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Let the operator decide whether to delete configuration on the router, and clear up :term:`IPAM` resources."""
Router.from_subscription(subscription_id)
class TerminateForm(FormPage):
termination_label: Label = (
"Please confirm whether configuration should get removed from the router, and whether IPAM resources should"
" be released." # type: ignore[assignment]
)
tt_number: str
remove_configuration: bool = True
clean_up_ipam: bool = True
user_input = yield TerminateForm
return user_input.dict()
@step("Deprovision loopback IPs from IPAM")
def deprovision_loopback_ips(subscription: Router) -> dict:
"""Clear up the loopback addresses from :term:`IPAM`."""
infoblox.delete_host_by_ip(ipaddress.IPv4Address(subscription.router.router_lo_ipv4_address))
return {"subscription": subscription}
@step("Remove configuration from router")
def remove_config_from_router() -> None:
"""Remove configuration from the router, first as a dry run.
FIXME: Add actual content
TODO: update unit test accordingly
"""
@step("Remove Device from NetBox")
def remove_device_from_netbox(subscription: Router) -> dict[str, Router]:
"""Remove the device from Netbox."""
if subscription.vendor == RouterVendor.NOKIA:
NetboxClient().delete_device(subscription.router.router_fqdn)
return {"subscription": subscription}
@workflow(
"Terminate router",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE,
)
def terminate_router() -> StepList:
"""Terminate a router subscription.
* Let the operator decide whether to delete :term:`IPAM` resources, and remove configuration from the router
* Clear up :term:`IPAM` resources, if selected by the operator
* Disable and delete configuration on the router, if selected by the operator
* Mark the subscription as terminated in the service database
"""
run_ipam_steps = conditional(lambda state: state["clean_up_ipam"])
run_config_steps = conditional(lambda state: state["remove_configuration"])
return (
init
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> run_ipam_steps(deprovision_loopback_ips)
>> run_config_steps(remove_config_from_router)
>> remove_device_from_netbox
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)