Skip to content
Snippets Groups Projects
Select Git revision
  • 021c1e621d89a28bd9ab4bd9f0f0969c38a5dae8
  • develop default protected
  • renovate/major-angular-monorepo
  • renovate/primeng-themes-20.x
  • renovate/major-angularmaterial-monorepo
  • renovate/major-angular-cli-monorepo
  • release/1.8.0 protected
  • renovate/typescript-5.x
  • renovate/major-jasmine-monorepo
  • enable-paging-support-for-list-of-instances-endpoint
  • renovate/bootstrap-5.x
  • renovate/karma-jasmine-html-reporter-2.x
  • renovate/formio-angular-9.x
  • renovate/formio-angular-7.x
  • release/1.9.0 protected
  • release/1.7.1 protected
  • release/1.7.0 protected
  • 240-visual-indicator-to-track-bulk-deployment-progress
  • release/1.6.5 protected
  • release/1.5.3 protected
  • release/1.5.2 protected
  • v1.8.1-alfa protected
  • v1.8.0 protected
  • v1.7.1 protected
  • v1.7.0 protected
  • v1.6.5 protected
  • v1.6.4 protected
  • v1.6.3 protected
  • v1.6.2 protected
  • v1.6.1 protected
  • v1.6.0 protected
31 results

ssh-keys.component.ts

Blame
  • terminate_router.py 4.04 KiB
    import ipaddress
    import logging
    
    from orchestrator.forms import FormPage
    from orchestrator.forms.validators import Label
    from orchestrator.targets import Target
    from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
    from orchestrator.workflow import StepList, conditional, done, init, step, workflow
    from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
    from orchestrator.workflows.utils import wrap_modify_initial_input_form
    
    from gso.products.product_types.router import Router
    from gso.services import ipam
    from gso.services.ipam import HostAddresses, V4ServiceNetwork, V6ServiceNetwork
    
    logger = logging.getLogger(__name__)
    
    
    def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
        subscription = Router.from_subscription(subscription_id)
    
        class TerminateForm(FormPage):
            termination_label: Label = (
                "Please confirm whether configuration should get removed from the router, and whether IPAM resources should"
                " be released."  # type: ignore
            )
            remove_configuration: bool = True
            clean_up_ipam: bool = True
    
        user_input = yield TerminateForm
        return user_input.dict()
    
    
    @step("Deprovision loopback IPs from IPAM/DNS")
    def deprovision_loopback_ips(subscription: Router) -> dict[str, HostAddresses]:
        input_host_addresses = ipam.HostAddresses(
            v4=ipaddress.IPv4Address(subscription.router.router_lo_ipv4_address),
            v6=ipaddress.IPv6Address(subscription.router.router_lo_ipv6_address),
        )
        fqdn_as_list = subscription.router.router_fqdn.split(".")
        hostname = str(fqdn_as_list[0]) + "." + str(fqdn_as_list[1]) + "." + str(fqdn_as_list[2])
        lo0_name = "lo0." + hostname
        host_addresses = ipam.delete_host(
            hostname=lo0_name,
            host_addresses=input_host_addresses,
            cname_aliases=[hostname],
            service_type="LO",
        )
        return {"addresses": host_addresses}
    
    
    @step("Deprovision SI- interface  IPs from IPAM/DNS")
    def deprovision_si_ips(subscription: Router) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
        service_network = ipam.delete_network(
            network=ipaddress.ip_network(subscription.router.router_si_ipv4_network),  # type: ignore
            service_type="SI",
        )
        return {"service_network": service_network}
    
    
    @step("Deprovision LT- interface (IAS) IPs from IPAM/DNS")
    def deprovision_lt_ips(subscription: Router) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
        service_network_v4 = ipam.delete_network(
            network=ipaddress.ip_network(subscription.router.router_ias_lt_ipv4_network),  # type: ignore
            service_type="LT_IAS",
        )
        service_network_v6 = ipam.delete_network(
            network=ipaddress.ip_network(subscription.router.router_ias_lt_ipv6_network),  # type: ignore
            service_type="LT_IAS",
        )
        return {
            "service_network_v4": service_network_v4,
            "service_network_v6": service_network_v6,
        }
    
    
    @step("Remove configuration from router")
    def remove_config_from_router() -> None:
        pass
    
    
    @workflow(
        "Terminate router",
        initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
        target=Target.TERMINATE,
    )
    def terminate_router() -> StepList:
        run_ipam_steps = conditional(lambda state: state.get("clean_up_ipam", True))
        run_config_steps = conditional(lambda state: state.get("remove_configuration", True))
        run_ias_removal = conditional(lambda subscription: "router_ias_lt_ipv4_network" in subscription.get("router", {}))
        run_si_removal = conditional(lambda subscription: "router_si_ipv4_network" in subscription.get("router", {}))
    
        ipam_steps = (
            StepList([deprovision_loopback_ips])
            >> run_si_removal(deprovision_si_ips)
            >> run_ias_removal(deprovision_lt_ips)
        )
    
        return (
            init
            >> store_process_subscription(Target.TERMINATE)
            >> unsync
            >> run_ipam_steps(ipam_steps)
            >> run_config_steps(remove_config_from_router)
            >> set_status(SubscriptionLifecycle.TERMINATED)
            >> resync
            >> done
        )