"""Workflow steps that are shared across multiple workflows.""" import json from typing import Any from orchestrator import inputstep, step from orchestrator.config.assignee import Assignee from orchestrator.types import State, UUIDstr from orchestrator.utils.json import json_dumps from orchestrator.workflow import StepList, begin, conditional from pydantic import ConfigDict from pydantic_forms.core import FormPage from pydantic_forms.types import FormGenerator from pydantic_forms.validators import Label from gso.products.product_blocks.router import RouterRole from gso.products.product_types.iptrunk import Iptrunk from gso.services.lso_client import LSOState, lso_interaction from gso.settings import load_oss_params from gso.utils.helpers import generate_inventory_for_active_routers from gso.utils.shared_enums import Vendor def _deploy_base_config( subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, *, dry_run: bool, ) -> LSOState: extra_vars = { "wfo_router_json": subscription, "dry_run": dry_run, "verb": "deploy", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy base config", } return { "playbook_name": "gap_ansible/playbooks/base_config.yaml", "inventory": {"all": {"hosts": {subscription["router"]["router_fqdn"]: None}}}, "extra_vars": extra_vars, } def _update_sdp_mesh( subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, *, dry_run: bool, ) -> LSOState: inventory = generate_inventory_for_active_routers( router_role=RouterRole.PE, router_vendor=Vendor.NOKIA, exclude_routers=[subscription["router"]["router_fqdn"]] ) extra_vars = { "dry_run": dry_run, "subscription": subscription, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " f"Update the SDP mesh for L2circuits(epipes) config on PE NOKIA routers", "verb": "update_sdp_mesh", "pe_router_list": { subscription["router"]["router_fqdn"]: { "lo4": str(subscription["router"]["router_lo_ipv4_address"]), "lo6": str(subscription["router"]["router_lo_ipv6_address"]), } }, } return { "playbook_name": "gap_ansible/playbooks/update_pe_sdp_mesh.yaml", "inventory": inventory, "extra_vars": extra_vars, } def _update_sdp_single_pe( subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, *, dry_run: bool, ) -> LSOState: extra_vars = { "dry_run": dry_run, "subscription": subscription, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " f"Update the SDP mesh for L2circuits(epipes) config on PE NOKIA routers", "verb": "update_sdp_mesh", "pe_router_list": generate_inventory_for_active_routers( router_role=RouterRole.PE, exclude_routers=[subscription["router"]["router_fqdn"]], )["all"]["hosts"], } if not extra_vars["pe_router_list"]: return { "playbook_name": "", "inventory": {"all": {"hosts": {}}}, "extra_vars": {}, } return { "playbook_name": "gap_ansible/playbooks/update_pe_sdp_mesh.yaml", "inventory": {"all": {"hosts": {subscription["router"]["router_fqdn"]: None}}}, "extra_vars": extra_vars, } def _add_pe_mesh_to_pe( subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, *, dry_run: bool, ) -> LSOState: extra_vars = { "dry_run": dry_run, "subscription": subscription, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " f"Add list of PE routers into iGEANT/iGEANT6 groups of the PE router", "verb": "add_pe_mesh_to_pe", "pe_router_list": generate_inventory_for_active_routers( router_role=RouterRole.PE, exclude_routers=[subscription["router"]["router_fqdn"]] )["all"]["hosts"], } if not extra_vars["pe_router_list"]: return { "playbook_name": "", "inventory": {"all": {"hosts": {}}}, "extra_vars": {}, } return { "playbook_name": "gap_ansible/playbooks/update_ibgp_mesh.yaml", "inventory": {"all": {"hosts": {subscription["router"]["router_fqdn"]: None}}}, "extra_vars": extra_vars, } def _add_pe_to_pe_mesh( subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, *, dry_run: bool, ) -> LSOState: inventory = generate_inventory_for_active_routers( router_role=RouterRole.PE, exclude_routers=[subscription["router"]["router_fqdn"]] ) extra_vars = { "dry_run": dry_run, "subscription": subscription, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " f"Add the PE router to all PE routers in iGEANT/iGEANT6.", "verb": "add_pe_to_pe_mesh", } return { "playbook_name": "gap_ansible/playbooks/update_ibgp_mesh.yaml", "inventory": inventory, "extra_vars": extra_vars, } def _add_all_p_to_pe( subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, *, dry_run: bool, ) -> LSOState: extra_vars = { "dry_run": dry_run, "subscription": subscription, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Add all P-routers to this new PE", "verb": "add_all_p_to_pe", "p_router_list": generate_inventory_for_active_routers( router_role=RouterRole.P, exclude_routers=[subscription["router"]["router_fqdn"]] )["all"]["hosts"], } if not extra_vars["p_router_list"]: return { "playbook_name": "", "inventory": {"all": {"hosts": {}}}, "extra_vars": {}, } return { "playbook_name": "gap_ansible/playbooks/update_ibgp_mesh.yaml", "inventory": {"all": {"hosts": {subscription["router"]["router_fqdn"]: None}}}, "extra_vars": extra_vars, } def _add_pe_to_all_p( subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, *, dry_run: bool, ) -> LSOState: inventory = generate_inventory_for_active_routers( router_role=RouterRole.P, exclude_routers=[subscription["router"]["router_fqdn"]] ) extra_vars = { "dry_run": dry_run, "subscription": subscription, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " f"Add promoted router to all PE routers in iGEANT/iGEANT6", "verb": "add_pe_to_all_p", } return { "playbook_name": "gap_ansible/playbooks/update_ibgp_mesh.yaml", "inventory": inventory, "extra_vars": extra_vars, } @step("[DRY RUN] Deploy base config") def deploy_base_config_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a dry run of provisioning base config on a router.""" return _deploy_base_config(subscription, tt_number, process_id, dry_run=True) @step("[FOR REAL] Deploy base config") def deploy_base_config_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Deploy base config on a router using the provisioning proxy.""" return _deploy_base_config(subscription, tt_number, process_id, dry_run=False) @step("[DRY RUN] Add the PE to all P routers") def add_pe_to_all_p_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a dry run of adding the PE router to all P routers.""" return _add_pe_to_all_p(subscription, tt_number, process_id, dry_run=True) @step("[FOR REAL] Add the PE to all P routers") def add_pe_to_all_p_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a real run of adding the PE router to all P routers.""" return _add_pe_to_all_p(subscription, tt_number, process_id, dry_run=False) @step("[DRY RUN] Add all P routers to the PE") def add_all_p_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a dry run of adding all P routers to the PE router.""" return _add_all_p_to_pe(subscription, tt_number, process_id, dry_run=True) @step("[FOR REAL] Add all P routers to the PE") def add_all_p_to_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a real run of adding all P routers to the PE router.""" return _add_all_p_to_pe(subscription, tt_number, process_id, dry_run=False) @step("[DRY RUN] Add the PE to PE mesh") def add_pe_to_pe_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a dry run of adding the PE router to all PE routers in iGEANT/iGEANT6.""" return _add_pe_to_pe_mesh(subscription, tt_number, process_id, dry_run=True) @step("[FOR REAL] Add the PE to PE mesh") def add_pe_to_pe_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a real run of adding the PE router to all PE routers in iGEANT/iGEANT6.""" return _add_pe_to_pe_mesh(subscription, tt_number, process_id, dry_run=False) @step("[DRY RUN] Add PE mesh to the PE") def add_pe_mesh_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a dry run of adding list of PE routers into iGEANT/iGEANT6 of the router.""" return _add_pe_mesh_to_pe(subscription, tt_number, process_id, dry_run=True) @step("[FOR REAL] Add PE mesh to the PE") def add_pe_mesh_to_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a real run of adding list of PE routers into iGEANT/iGEANT6 of the router.""" return _add_pe_mesh_to_pe(subscription, tt_number, process_id, dry_run=False) @step("[DRY RUN] Include the PE into SDP mesh on other Nokia PEs") def update_sdp_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a dry run of including new PE router in SDP mesh on other NOKIA PE routers.""" return _update_sdp_mesh(subscription, tt_number, process_id, dry_run=True) @step("[FOR REAL] Include the PE into SDP mesh on other Nokia PEs") def update_sdp_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a real run of including new PE router in SDP mesh on other NOKIA PE routers.""" return _update_sdp_mesh(subscription, tt_number, process_id, dry_run=False) @step("[DRY RUN] Configure SDP on the PE to all other Nokia PEs") def update_sdp_single_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Perform a dry run of configuring SDP on a new PE router to all other NOKIA PE routers.""" return _update_sdp_single_pe(subscription, tt_number, process_id, dry_run=True) @step("[FOR REAL] Configure SDP on the PE to all other Nokia PEs") def update_sdp_single_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State: """Configure SDP on a new PE router to all other NOKIA PE routers.""" return _update_sdp_single_pe(subscription, tt_number, process_id, dry_run=False) @step("[FOR REAL] Set ISIS metric to very high value") def set_isis_to_max(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState: """Workflow step for setting the :term:`ISIS` metric to an arbitrarily high value to drain a link.""" old_isis_metric = subscription.iptrunk.iptrunk_isis_metric params = load_oss_params() subscription.iptrunk.iptrunk_isis_metric = params.GENERAL.isis_high_metric extra_vars = { "wfo_trunk_json": json.loads(json_dumps(subscription)), "dry_run": False, "verb": "deploy", "config_object": "isis_interface", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " f"{subscription.iptrunk.geant_s_sid}", } return { "subscription": subscription, "playbook_name": "gap_ansible/playbooks/iptrunks.yaml", "inventory": { "all": { "hosts": { subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None, subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None, } } }, "extra_vars": extra_vars, "old_isis_metric": old_isis_metric, } @step("Run show commands after base config install") def run_checks_after_base_config(subscription: dict[str, Any]) -> LSOState: """Workflow step for running show commands after installing base config.""" return { "playbook_name": "gap_ansible/playbooks/base_config_checks.yaml", "inventory": {"all": {"hosts": {subscription["router"]["router_fqdn"]: None}}}, "extra_vars": {"wfo_router_json": subscription}, } @step("Check iBGP session") def check_pe_ibgp(subscription: dict[str, Any]) -> LSOState: """Check the iBGP session.""" extra_vars = { "dry_run": False, "subscription": subscription, "verb": "check_pe_ibgp", } return { "playbook_name": "gap_ansible/playbooks/check_ibgp.yaml", "inventory": {"all": {"hosts": {subscription["router"]["router_fqdn"]: None}}}, "extra_vars": extra_vars, } @step("Check L3 services") def check_l3_services(subscription: dict[str, Any]) -> LSOState: """Check L3 services.""" extra_vars = { "dry_run": False, "subscription": subscription, "verb": "check_base_ris", } return { "playbook_name": "gap_ansible/playbooks/check_l3_services.yaml", "inventory": {"all": {"hosts": {subscription["router"]["router_fqdn"]: None}}}, "extra_vars": extra_vars, } @inputstep("Prompt for new SharePoint checklist", assignee=Assignee.SYSTEM) def prompt_sharepoint_checklist_url(checklist_url: str) -> FormGenerator: """Prompt the operator with the checklist in SharePoint for approving a new subscription.""" class SharepointPrompt(FormPage): model_config = ConfigDict(title="Complete new checklist") info_label_1: Label = f"A new checklist has been created at: {checklist_url}" info_label_2: Label = "Click proceed to finish the workflow." yield SharepointPrompt return {} _is_moodi_enabled = conditional(lambda _: load_oss_params().MOODI.moodi_enabled) def start_moodi() -> StepList: """Start monitoring on demand using Moodi Telemetry stack.""" host = load_oss_params().MOODI.host @step("Start Moodi") def _start_moodi(subscription: dict[str, Any]) -> LSOState: return { "playbook_name": "moodi_telemetry/playbooks/start_moodi.yaml", "inventory": {"all": {"hosts": {host: None}}}, "extra_vars": {"subscription": subscription}, } return begin >> _is_moodi_enabled(begin >> lso_interaction(_start_moodi)) def stop_moodi() -> StepList: """Stop Moodi Telemetry monitoring on demand.""" host = load_oss_params().MOODI.host @step("Stop Moodi") def _stop_moodi() -> LSOState: return { "playbook_name": "moodi_telemetry/playbooks/stop_moodi.yaml", "inventory": {"all": {"hosts": {host: None}}}, "extra_vars": None, } return begin >> _is_moodi_enabled(begin >> lso_interaction(_stop_moodi))