Skip to content
Snippets Groups Projects
terminate_iptrunk.py 3.74 KiB
Newer Older
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form

from gso.products.product_types.iptrunk import Iptrunk
from gso.services import infoblox, provisioning_proxy
from gso.services.provisioning_proxy import pp_interaction
from gso.workflows.iptrunk.utils import set_isis_to_90000
    class TerminateForm(FormPage):
        termination_label: Label = (
            "Please confirm whether configuration should get removed from the A and B sides of the trunk, and whether "
            "IPAM resources should be released."  # type: ignore
        )
        remove_configuration: bool = True
        clean_up_ipam: bool = True
    user_input = yield TerminateForm
    return user_input.dict()
def drain_traffic_from_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
    provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "isis_interface", False)
        "label_text": "This is setting the ISIS metric of the trunk to 9000. Press refresh to get the results."
        "When traffic is drained, confirm to continue.",
def deprovision_ip_trunk_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
    provisioning_proxy.deprovision_ip_trunk(subscription, process_id, tt_number, True)
        "label_text": "[DRY RUN] Terminating IP trunk, please refresh to get the results of the playbook.",
def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
    provisioning_proxy.deprovision_ip_trunk(subscription, process_id, tt_number, False)
        "label_text": "[COMMIT] Terminating IP trunk, please refresh to get the results of the playbook.",
def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> dict:
    infoblox.delete_network(ipaddress.IPv4Network(subscription.iptrunk.iptrunk_ipv4_network))

    return {"subscription": subscription}
def deprovision_ip_trunk_ipv6(subscription: Iptrunk) -> dict:
    infoblox.delete_network(ipaddress.IPv6Network(subscription.iptrunk.iptrunk_ipv6_network))

    return {"subscription": subscription}
    initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
    target=Target.TERMINATE,
)
def terminate_iptrunk() -> StepList:
    run_config_steps = conditional(lambda state: state["remove_configuration"])
    run_ipam_steps = conditional(lambda state: state["clean_up_ipam"])
        init
        >> pp_interaction(set_isis_to_90000, 3)
        >> pp_interaction(deprovision_ip_trunk_dry, 3)
        >> pp_interaction(deprovision_ip_trunk_real, 3)
    )
    ipam_steps = init >> deprovision_ip_trunk_ipv4 >> deprovision_ip_trunk_ipv6
        >> run_config_steps(config_steps)
        >> run_ipam_steps(ipam_steps)
        >> set_status(SubscriptionLifecycle.TERMINATED)
        >> resync
        >> done