-
Karel van Klink authoredKarel van Klink authored
workflow_steps.py 16.45 KiB
"""Workflow steps that are shared across multiple workflows."""
import json
from typing import Any
from orchestrator import inputstep, step
from orchestrator.config.assignee import Assignee
from orchestrator.types import State, UUIDstr
from orchestrator.utils.json import json_dumps
from pydantic import ConfigDict
from pydantic_forms.core import FormPage
from pydantic_forms.types import FormGenerator
from pydantic_forms.validators import Label
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.iptrunk import Iptrunk
from gso.services import lso_client
from gso.settings import load_oss_params
from gso.utils.helpers import generate_inventory_for_active_routers
from gso.utils.shared_enums import Vendor
def _deploy_base_config(
subscription: dict[str, Any],
tt_number: str,
callback_route: str,
process_id: UUIDstr,
*,
dry_run: bool,
) -> None:
inventory = subscription["router"]["router_fqdn"]
extra_vars = {
"wfo_router_json": subscription,
"dry_run": dry_run,
"verb": "deploy",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy base config",
}
lso_client.execute_playbook(
playbook_name="base_config.yaml",
callback_route=callback_route,
inventory=inventory,
extra_vars=extra_vars,
)
def _update_sdp_mesh(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
*,
dry_run: bool,
) -> None:
inventory = generate_inventory_for_active_routers(
router_role=RouterRole.PE, router_vendor=Vendor.NOKIA, exclude_routers=[subscription["router"]["router_fqdn"]]
)
if len(inventory["all"]["hosts"].keys()) == 0:
return # Skip this Ansible interaction if the inventory is empty.
extra_vars = {
"dry_run": dry_run,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Update the SDP mesh for L2circuits(epipes) config on PE NOKIA routers",
"verb": "update_sdp_mesh",
"pe_router_list": {
subscription["router"]["router_fqdn"]: {
"lo4": str(subscription["router"]["router_lo_ipv4_address"]),
"lo6": str(subscription["router"]["router_lo_ipv6_address"]),
}
},
}
lso_client.execute_playbook(
playbook_name="update_pe_sdp_mesh.yaml",
callback_route=callback_route,
inventory=inventory,
extra_vars=extra_vars,
)
def _update_sdp_single_pe(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
*,
dry_run: bool,
) -> None:
inventory = subscription["router"]["router_fqdn"]
extra_vars = {
"dry_run": dry_run,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Update the SDP mesh for L2circuits(epipes) config on PE NOKIA routers",
"verb": "update_sdp_mesh",
"pe_router_list": generate_inventory_for_active_routers(
router_role=RouterRole.PE,
router_vendor=Vendor.NOKIA,
exclude_routers=[subscription["router"]["router_fqdn"]],
)["all"]["hosts"],
}
lso_client.execute_playbook(
playbook_name="update_pe_sdp_mesh.yaml",
callback_route=callback_route,
inventory=inventory,
extra_vars=extra_vars,
)
def _add_pe_mesh_to_pe(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
*,
dry_run: bool,
) -> None:
inventory = subscription["router"]["router_fqdn"]
extra_vars = {
"dry_run": dry_run,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Add list of PE routers into iGEANT/iGEANT6 groups of the PE router",
"verb": "add_pe_mesh_to_pe",
"pe_router_list": generate_inventory_for_active_routers(
router_role=RouterRole.PE, exclude_routers=[subscription["router"]["router_fqdn"]]
)["all"]["hosts"],
}
lso_client.execute_playbook(
playbook_name="update_ibgp_mesh.yaml",
callback_route=callback_route,
inventory=inventory,
extra_vars=extra_vars,
)
def _add_pe_to_pe_mesh(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
*,
dry_run: bool,
) -> None:
inventory = generate_inventory_for_active_routers(
router_role=RouterRole.PE, exclude_routers=[subscription["router"]["router_fqdn"]]
)
extra_vars = {
"dry_run": dry_run,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Add the PE router to all PE routers in iGEANT/iGEANT6.",
"verb": "add_pe_to_pe_mesh",
}
lso_client.execute_playbook(
playbook_name="update_ibgp_mesh.yaml",
callback_route=callback_route,
inventory=inventory,
extra_vars=extra_vars,
)
def _add_all_p_to_pe(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
*,
dry_run: bool,
) -> None:
inventory = subscription["router"]["router_fqdn"]
extra_vars = {
"dry_run": dry_run,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Add all P-routers to this new PE",
"verb": "add_all_p_to_pe",
"p_router_list": generate_inventory_for_active_routers(
router_role=RouterRole.P, exclude_routers=[subscription["router"]["router_fqdn"]]
)["all"]["hosts"],
}
lso_client.execute_playbook(
playbook_name="update_ibgp_mesh.yaml",
callback_route=callback_route,
inventory=inventory,
extra_vars=extra_vars,
)
def _add_pe_to_all_p(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
*,
dry_run: bool,
) -> None:
inventory = generate_inventory_for_active_routers(
router_role=RouterRole.P, exclude_routers=[subscription["router"]["router_fqdn"]]
)
extra_vars = {
"dry_run": dry_run,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Add promoted router to all PE routers in iGEANT/iGEANT6",
"verb": "add_pe_to_all_p",
}
lso_client.execute_playbook(
playbook_name="update_ibgp_mesh.yaml",
callback_route=callback_route,
inventory=inventory,
extra_vars=extra_vars,
)
@step("[DRY RUN] Add the PE to all P routers")
def add_pe_to_all_p_dry(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a dry run of adding the PE router to all P routers."""
_add_pe_to_all_p(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=True,
)
return {"subscription": subscription}
@step("[FOR REAL] Add the PE to all P routers")
def add_pe_to_all_p_real(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a real run of adding the PE router to all P routers."""
_add_pe_to_all_p(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=False,
)
return {"subscription": subscription}
@step("[DRY RUN] Add all P routers to the PE")
def add_all_p_to_pe_dry(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a dry run of adding all P routers to the PE router."""
_add_all_p_to_pe(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=True,
)
return {"subscription": subscription}
@step("[FOR REAL] Add all P routers to the PE")
def add_all_p_to_pe_real(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a real run of adding all P routers to the PE router."""
_add_all_p_to_pe(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=False,
)
return {"subscription": subscription}
@step("[DRY RUN] Add the PE to PE mesh")
def add_pe_to_pe_mesh_dry(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a dry run of adding the PE router to all PE routers in iGEANT/iGEANT6."""
_add_pe_to_pe_mesh(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=True,
)
return {"subscription": subscription}
@step("[FOR REAL] Add the PE to PE mesh")
def add_pe_to_pe_mesh_real(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a dry run of adding the PE router to all PE routers in iGEANT/iGEANT6."""
_add_pe_to_pe_mesh(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=False,
)
return {"subscription": subscription}
@step("[DRY RUN] Add PE mesh to the PE")
def add_pe_mesh_to_pe_dry(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a dry run of adding list of PE routers into iGEANT/iGEANT6 of the router."""
_add_pe_mesh_to_pe(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=True,
)
return {"subscription": subscription}
@step("[FOR REAL] Add PE mesh to the PE")
def add_pe_mesh_to_pe_real(
subscription: dict[str, Any],
callback_route: str,
tt_number: str,
process_id: UUIDstr,
) -> State:
"""Perform a real run of adding list of PE routers into iGEANT/iGEANT6 of the router."""
_add_pe_mesh_to_pe(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=False,
)
return {"subscription": subscription}
@step("[DRY RUN] Deploy base config")
def deploy_base_config_dry(
subscription: dict[str, Any],
tt_number: str,
callback_route: str,
process_id: UUIDstr,
) -> State:
"""Perform a dry run of provisioning base config on a router."""
_deploy_base_config(subscription, tt_number, callback_route, process_id, dry_run=True)
return {"subscription": subscription}
@step("[FOR REAL] Deploy base config")
def deploy_base_config_real(
subscription: dict[str, Any],
tt_number: str,
callback_route: str,
process_id: UUIDstr,
) -> State:
"""Deploy base config on a router using the provisioning proxy."""
_deploy_base_config(subscription, tt_number, callback_route, process_id, dry_run=False)
return {"subscription": subscription}
@step("[DRY RUN] Include the PE into SDP mesh on other Nokia PEs")
def update_sdp_mesh_dry(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> State:
"""Perform a dry run of including new PE router in SDP mesh on other NOKIA PE routers."""
_update_sdp_mesh(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=True,
)
return {"subscription": subscription}
@step("[FOR REAL] Include the PE into SDP mesh on other Nokia PEs")
def update_sdp_mesh_real(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> State:
"""Include new PE router in SDP mesh on other NOKIA PE routers."""
_update_sdp_mesh(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=False,
)
return {"subscription": subscription}
@step("[DRY RUN] Configure SDP on the PE to all other Nokia PEs")
def update_sdp_single_pe_dry(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> State:
"""Perform a dry run of configuring SDP on a new PE router to all other NOKIA PE routers."""
_update_sdp_single_pe(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=True,
)
return {"subscription": subscription}
@step("[FOR REAL] Configure SDP on the PE to all other Nokia PEs")
def update_sdp_single_pe_real(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> State:
"""Configure SDP on a new PE router to all other NOKIA PE routers."""
_update_sdp_single_pe(
subscription=subscription,
tt_number=tt_number,
callback_route=callback_route,
process_id=process_id,
dry_run=False,
)
return {"subscription": subscription}
@step("[FOR REAL] Set ISIS metric to very high value")
def set_isis_to_max(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
"""Workflow step for setting the :term:`ISIS` metric to an arbitrarily high value to drain a link."""
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
params = load_oss_params()
subscription.iptrunk.iptrunk_isis_metric = params.GENERAL.isis_high_metric
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
"dry_run": False,
"verb": "deploy",
"config_object": "isis_interface",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
f"{subscription.iptrunk.geant_s_sid}",
}
lso_client.execute_playbook(
playbook_name="iptrunks.yaml",
callback_route=callback_route,
inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
extra_vars=extra_vars,
)
return {
"subscription": subscription,
"old_isis_metric": old_isis_metric,
}
@step("Run show commands after base config install")
def run_checks_after_base_config(subscription: dict[str, Any], callback_route: str) -> None:
"""Workflow step for running show commands after installing base config."""
lso_client.execute_playbook(
playbook_name="base_config_checks.yaml",
callback_route=callback_route,
inventory=subscription["router"]["router_fqdn"],
extra_vars={"wfo_router_json": subscription},
)
@step("Check PE iBGP sessions")
def check_pe_ibgp(subscription: dict[str, Any], callback_route: str) -> None:
"""Check the iBGP session."""
extra_vars = {
"dry_run": False,
"subscription": subscription,
"verb": "check_pe_ibgp",
}
lso_client.execute_playbook(
playbook_name="check_ibgp.yaml",
callback_route=callback_route,
inventory=subscription["router"]["router_fqdn"],
extra_vars=extra_vars,
)
@step("Check L3 VPRN services")
def check_l3_services(subscription: dict[str, Any], callback_route: str) -> None:
"""Check L3 services."""
extra_vars = {
"dry_run": False,
"subscription": subscription,
"verb": "check_base_ris",
}
lso_client.execute_playbook(
playbook_name="check_l3_services.yaml",
callback_route=callback_route,
inventory=subscription["router"]["router_fqdn"],
extra_vars=extra_vars,
)
@inputstep("Prompt for new SharePoint checklist", assignee=Assignee.SYSTEM)
def prompt_sharepoint_checklist_url(checklist_url: str) -> FormGenerator:
"""Prompt the operator with the checklist in SharePoint for approving a new subscription."""
class SharepointPrompt(FormPage):
model_config = ConfigDict(title="Complete new checklist")
info_label_1: Label = f"A new checklist has been created at: {checklist_url}"
info_label_2: Label = "Click proceed to finish the workflow."
yield SharepointPrompt
return {}