"""Workflow steps that are shared across multiple workflows.""" import json from typing import Any from orchestrator import step from orchestrator.types import State, UUIDstr from orchestrator.utils.json import json_dumps from gso.products.product_types.iptrunk import Iptrunk from gso.services.provisioning_proxy import execute_playbook def _deploy_base_config( subscription: dict[str, Any], tt_number: str, callback_route: str, process_id: UUIDstr, *, dry_run: bool, ) -> None: inventory = subscription["router"]["router_fqdn"] extra_vars = { "wfo_router_json": subscription, "dry_run": dry_run, "verb": "deploy", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy base config", } execute_playbook( playbook_name="base_config.yaml", callback_route=callback_route, inventory=inventory, extra_vars=extra_vars, ) @step("[DRY RUN] Deploy base config") def deploy_base_config_dry( subscription: dict[str, Any], tt_number: str, callback_route: str, process_id: UUIDstr, ) -> State: """Perform a dry run of provisioning base config on a router.""" _deploy_base_config(subscription, tt_number, callback_route, process_id, dry_run=True) return {"subscription": subscription} @step("[FOR REAL] Deploy base config") def deploy_base_config_real( subscription: dict[str, Any], tt_number: str, callback_route: str, process_id: UUIDstr, ) -> State: """Deploy base config on a router using the provisioning proxy.""" _deploy_base_config(subscription, tt_number, callback_route, process_id, dry_run=False) return {"subscription": subscription} @step("[COMMIT] Set ISIS metric to 90.000") def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Workflow step for setting the :term:`ISIS` metric to 90k as an arbitrarily high value to drain a link.""" old_isis_metric = subscription.iptrunk.iptrunk_isis_metric subscription.iptrunk.iptrunk_isis_metric = 90000 extra_vars = { "wfo_trunk_json": json.loads(json_dumps(subscription)), "dry_run": False, "verb": "deploy", "config_object": "isis_interface", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " f"{subscription.iptrunk.geant_s_sid} ", } execute_playbook( playbook_name="playbooks.yaml", callback_route=callback_route, inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n" f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", extra_vars=extra_vars, ) return { "subscription": subscription, "old_isis_metric": old_isis_metric, } @step("[CHECK] Run show commands after base config install") def run_checks_after_base_config(subscription: dict[str, Any], callback_route: str) -> None: """Workflow step for running show commands after installing base config.""" execute_playbook( playbook_name="base_config_checks.yaml", callback_route=callback_route, inventory=subscription["router"]["router_fqdn"], extra_vars={}, ) return { "subscription": subscription, }