Newer
Older
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.router import Router
from gso.services import infoblox
logger = logging.getLogger(__name__)
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
class TerminateForm(FormPage):
termination_label: Label = (
"Please confirm whether configuration should get removed from the router, and whether IPAM resources should"
" be released." # type: ignore
remove_configuration: bool = True
clean_up_ipam: bool = True
user_input = yield TerminateForm
return user_input.dict()
@step("Deprovision loopback IPs from IPAM")
def deprovision_loopback_ips(subscription: Router) -> dict:
infoblox.delete_host_by_ip(ipaddress.IPv4Address(subscription.router.router_lo_ipv4_address))
return {"subscription": subscription}
@step("Deprovision SI interface network from IPAM")
def deprovision_si_ips(subscription: Router) -> dict:
infoblox.delete_network(ipaddress.IPv4Network(subscription.router.router_si_ipv4_network))
return {"subscription": subscription}
@step("Deprovision IAS LT interfaces from IPAM")
def deprovision_lt_ips(subscription: Router) -> dict:
infoblox.delete_network(ipaddress.IPv4Network(subscription.router.router_ias_lt_ipv4_network))
infoblox.delete_network(ipaddress.IPv6Network(subscription.router.router_ias_lt_ipv6_network))
return {"subscription": subscription}
@step("Remove configuration from router")
def remove_config_from_router() -> None:
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE,
)
def terminate_router() -> StepList:
run_ipam_steps = conditional(lambda state: state["clean_up_ipam"])
run_config_steps = conditional(lambda state: state["remove_configuration"])
run_ias_removal = conditional(lambda state: state["subscription"]["router"]["router_is_ias_connected"])
init >> deprovision_loopback_ips >> run_ias_removal(deprovision_si_ips) >> run_ias_removal(deprovision_lt_ips)
return (
init
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> run_ipam_steps(ipam_steps)
>> run_config_steps(remove_config_from_router)
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)