Skip to content
Snippets Groups Projects
Commit 19aaac4f authored by Karel van Klink's avatar Karel van Klink :smiley_cat: Committed by Mohammad Torkashvand
Browse files

Introduce an LSOState type, but we can't use it in python 3.12 yet

parent 247c178d
No related branches found
No related tags found
1 merge request!277Rework LSO interaction
Pipeline #89387 passed
This commit is part of merge request !277. Comments created here will be created in the context of that merge request.
Showing with 104 additions and 93 deletions
......@@ -5,7 +5,7 @@
import json
import logging
from typing import Any
from typing import Any, Literal, TypedDict
import requests
from orchestrator import step
......@@ -23,6 +23,18 @@ from gso import settings
logger = logging.getLogger(__name__)
class _LSOState(TypedDict): # noqa: PYI049
"""An expanded state that must contain at least the required keys for the execution of an Ansible playbook."""
playbook_name: str
extra_vars: dict[str, Any]
inventory: dict[Literal["all"], dict[Literal["hosts"], dict[str, Any] | None]]
__extra_values__: Any # This is feature unavailable in python 3.12
LSOState = State # FIXME: Use the above definition when python3.13 is released
def _send_request(parameters: dict, callback_route: str) -> None:
"""Send a request to :term:`LSO`. The callback address is derived using the process ID provided.
......
......@@ -13,6 +13,7 @@ from pydantic_forms.types import FormGenerator
from pydantic_forms.validators import Label
from gso.products.product_types.iptrunk import Iptrunk
from gso.services.lso_client import LSOState
from gso.settings import load_oss_params
......@@ -22,7 +23,7 @@ def _deploy_base_config(
process_id: UUIDstr,
*,
dry_run: bool,
) -> State:
) -> LSOState:
extra_vars = {
"wfo_router_json": subscription,
"dry_run": dry_run,
......@@ -50,7 +51,7 @@ def deploy_base_config_real(subscription: dict[str, Any], tt_number: str, proces
@step("[FOR REAL] Set ISIS metric to very high value")
def set_isis_to_max(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
def set_isis_to_max(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Workflow step for setting the :term:`ISIS` metric to an arbitrarily high value to drain a link."""
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
params = load_oss_params()
......@@ -81,7 +82,7 @@ def set_isis_to_max(subscription: Iptrunk, process_id: UUIDstr, tt_number: str)
@step("Run show commands after base config install")
def run_checks_after_base_config(subscription: dict[str, Any]) -> State:
def run_checks_after_base_config(subscription: dict[str, Any]) -> LSOState:
"""Workflow step for running show commands after installing base config."""
return {
"playbook_name": "base_config_checks.yaml",
......
......@@ -28,7 +28,7 @@ from gso.products.product_blocks.iptrunk import (
from gso.products.product_types.iptrunk import Iptrunk, IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, subscriptions
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_name
from gso.services.sharepoint import SharePointClient
......@@ -325,7 +325,7 @@ def initialize_subscription(
@step("[DRY RUN] Provision IP trunk interface")
def provision_ip_trunk_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> State:
def provision_ip_trunk_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Perform a dry run of deploying configuration on both sides of the trunk."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -351,7 +351,7 @@ def provision_ip_trunk_iface_dry(subscription: IptrunkInactive, process_id: UUID
@step("[FOR REAL] Provision IP trunk interface")
def provision_ip_trunk_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> State:
def provision_ip_trunk_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Deploy IP trunk configuration on both sides."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -377,7 +377,7 @@ def provision_ip_trunk_iface_real(subscription: IptrunkInactive, process_id: UUI
@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(subscription: IptrunkInactive) -> State:
def check_ip_trunk_connectivity(subscription: IptrunkInactive) -> LSOState:
"""Check successful connectivity across the new trunk."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "ping"}
......@@ -389,7 +389,7 @@ def check_ip_trunk_connectivity(subscription: IptrunkInactive) -> State:
@step("[DRY RUN] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> State:
def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Perform a dry run of deploying :term:`ISIS` configuration."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -415,7 +415,7 @@ def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id:
@step("[FOR REAL] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> State:
def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Deploy :term:`ISIS` configuration on both sides."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -441,7 +441,7 @@ def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id
@step("Check ISIS adjacency")
def check_ip_trunk_isis(subscription: IptrunkInactive) -> State:
def check_ip_trunk_isis(subscription: IptrunkInactive) -> LSOState:
"""Run an Ansible playbook to confirm :term:`ISIS` adjacency."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"}
......
......@@ -5,14 +5,14 @@ import json
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.iptrunk import Iptrunk
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.utils.types.tt_number import TTNumber
......@@ -33,7 +33,7 @@ def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("[DRY RUN] Deploy TWAMP on both sides")
def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Perform a dry run of deploying the :term:`TWAMP` session."""
extra_vars = {
"subscription": json.loads(json_dumps(subscription)),
......@@ -58,7 +58,7 @@ def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str)
@step("[FOR REAL] Deploy TWAMP on both sides")
def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Deploy the :term:`TWAMP` session."""
extra_vars = {
"subscription": json.loads(json_dumps(subscription)),
......@@ -83,7 +83,7 @@ def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str
@step("Check TWAMP status on both sides")
def check_twamp_status(subscription: Iptrunk) -> State:
def check_twamp_status(subscription: Iptrunk) -> LSOState:
"""Check TWAMP session."""
extra_vars = {
"subscription": json.loads(json_dumps(subscription)),
......
......@@ -28,7 +28,7 @@ from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock, IptrunkTy
from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router
from gso.services import infoblox
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.sharepoint import SharePointClient
from gso.services.subscriptions import get_active_router_subscriptions
......@@ -203,7 +203,7 @@ def calculate_old_side_data(subscription: Iptrunk, replace_index: int) -> State:
@step("Check Optical PRE levels on the trunk endpoint")
def check_ip_trunk_optical_levels_pre(subscription: Iptrunk) -> State:
def check_ip_trunk_optical_levels_pre(subscription: Iptrunk) -> LSOState:
"""Check Optical PRE levels on the trunk."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "optical_pre"}
......@@ -224,7 +224,7 @@ def check_ip_trunk_optical_levels_pre(subscription: Iptrunk) -> State:
@step("Check Optical POST levels on the trunk endpoint")
def check_ip_trunk_optical_levels_post(
subscription: Iptrunk, new_node: Router, new_lag_member_interfaces: list[dict], replace_index: int
) -> State:
) -> LSOState:
"""Check Optical POST levels on the trunk."""
extra_vars = {
"wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
......@@ -251,7 +251,7 @@ def check_ip_trunk_optical_levels_post(
@step("Check LLDP on the trunk endpoints")
def check_ip_trunk_lldp(
subscription: Iptrunk, new_node: Router, new_lag_member_interfaces: list[dict], replace_index: int
) -> State:
) -> LSOState:
"""Check LLDP on the new trunk endpoints."""
extra_vars = {
"wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
......@@ -284,7 +284,7 @@ def disable_old_config_dry(
replace_index: int,
process_id: UUIDstr,
tt_number: str,
) -> State:
) -> LSOState:
"""Perform a dry run of disabling the old configuration on the routers."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -323,7 +323,7 @@ def disable_old_config_real(
replace_index: int,
process_id: UUIDstr,
tt_number: str,
) -> State:
) -> LSOState:
"""Disable old configuration on the routers."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -362,7 +362,7 @@ def deploy_new_config_dry(
replace_index: int,
process_id: UUIDstr,
tt_number: str,
) -> State:
) -> LSOState:
"""Perform a dry run of deploying configuration on the new router."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -401,7 +401,7 @@ def deploy_new_config_real(
replace_index: int,
process_id: UUIDstr,
tt_number: str,
) -> State:
) -> LSOState:
"""Deploy configuration on the new router."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -434,7 +434,7 @@ def deploy_new_config_real(
@step("[DRY RUN] Update BFD on the remaining side")
def update_remaining_side_bfd_dry(
subscription: Iptrunk, new_node: Router, replace_index: int, process_id: UUIDstr, tt_number: str
) -> State:
) -> LSOState:
"""Perform a dry run of updating configuration on the remaining router."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -462,7 +462,7 @@ def update_remaining_side_bfd_dry(
@step("[FOR REAL] Update BFD on the remaining side")
def update_remaining_side_bfd_real(
subscription: Iptrunk, new_node: Router, replace_index: int, process_id: UUIDstr, tt_number: str
) -> State:
) -> LSOState:
"""Update configuration on the remaining router."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -488,7 +488,7 @@ def update_remaining_side_bfd_real(
@step("Check BFD session over trunk")
def check_ip_trunk_bfd(subscription: Iptrunk, new_node: Router, replace_index: int) -> State:
def check_ip_trunk_bfd(subscription: Iptrunk, new_node: Router, replace_index: int) -> LSOState:
"""Check BFD session across the new trunk."""
extra_vars = {
"wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
......@@ -524,7 +524,7 @@ def confirm_continue_move_fiber() -> FormGenerator:
@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(subscription: Iptrunk, replace_index: int) -> State:
def check_ip_trunk_connectivity(subscription: Iptrunk, replace_index: int) -> LSOState:
"""Check successful connectivity across the new trunk."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "ping"}
......@@ -550,7 +550,7 @@ def deploy_new_isis(
replace_index: int,
process_id: UUIDstr,
tt_number: str,
) -> State:
) -> LSOState:
"""Deploy :term:`ISIS` configuration."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -581,7 +581,7 @@ def deploy_new_isis(
@step("Check ISIS adjacency")
def check_ip_trunk_isis(subscription: Iptrunk, replace_index: int) -> State:
def check_ip_trunk_isis(subscription: Iptrunk, replace_index: int) -> LSOState:
"""Run an Ansible playbook to confirm :term:`ISIS` adjacency."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"}
......@@ -618,7 +618,7 @@ def restore_isis_metric(
process_id: UUIDstr,
tt_number: str,
old_isis_metric: int,
) -> State:
) -> LSOState:
"""Restore the :term:`ISIS` metric to its original value."""
subscription.iptrunk.iptrunk_isis_metric = old_isis_metric
extra_vars = {
......@@ -653,7 +653,7 @@ def delete_old_config_dry(
replace_index: int,
process_id: UUIDstr,
tt_number: str,
) -> State:
) -> LSOState:
"""Perform a dry run of deleting the old configuration."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -692,7 +692,7 @@ def delete_old_config_real(
replace_index: int,
process_id: UUIDstr,
tt_number: str,
) -> State:
) -> LSOState:
"""Delete old configuration from the routers."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......
......@@ -11,7 +11,7 @@ from orchestrator.workflows.steps import resync, store_process_subscription, uns
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.iptrunk import Iptrunk
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.utils.types.tt_number import TTNumber
......@@ -37,7 +37,7 @@ def modify_iptrunk_subscription(subscription: Iptrunk, isis_metric: int) -> Stat
@step("[DRY RUN] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
def provision_ip_trunk_isis_iface_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Perform a dry run of deploying the new :term:`ISIS` metric on both sides of the trunk."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -63,7 +63,7 @@ def provision_ip_trunk_isis_iface_dry(subscription: Iptrunk, process_id: UUIDstr
@step("[FOR REAL] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
def provision_ip_trunk_isis_iface_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Deploy the new :term:`ISIS` metric on both sides of the trunk."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......
......@@ -21,7 +21,7 @@ from gso.products.product_blocks.iptrunk import (
IptrunkType,
)
from gso.products.product_types.iptrunk import Iptrunk
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.helpers import (
available_interfaces_choices,
......@@ -186,7 +186,7 @@ def determine_change_in_capacity(
@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(subscription: Iptrunk) -> State:
def check_ip_trunk_connectivity(subscription: Iptrunk) -> LSOState:
"""Check successful connectivity across a trunk."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "ping"}
......@@ -204,7 +204,7 @@ def check_ip_trunk_connectivity(subscription: Iptrunk) -> State:
@step("Check LLDP on the trunk endpoints")
def check_ip_trunk_lldp(subscription: Iptrunk) -> State:
def check_ip_trunk_lldp(subscription: Iptrunk) -> LSOState:
"""Check LLDP on trunk endpoints."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "lldp"}
......@@ -308,7 +308,7 @@ def modify_iptrunk_subscription(
@step("[DRY RUN] Provision IP trunk interface")
def provision_ip_trunk_iface_dry(
subscription: Iptrunk, process_id: UUIDstr, tt_number: str, removed_ae_members: list[str]
) -> State:
) -> LSOState:
"""Perform a dry run of deploying the updated IP trunk."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -337,7 +337,7 @@ def provision_ip_trunk_iface_dry(
@step("[FOR REAL] Provision IP trunk interface")
def provision_ip_trunk_iface_real(
subscription: Iptrunk, process_id: UUIDstr, tt_number: str, removed_ae_members: list[str]
) -> State:
) -> LSOState:
"""Provision the new IP trunk with updated interfaces."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -459,7 +459,7 @@ def allocate_interfaces_in_netbox_side_b(subscription: Iptrunk, previous_ae_memb
@step("Check Optical POST levels on the trunk endpoint")
def check_ip_trunk_optical_levels_post(subscription: Iptrunk) -> State:
def check_ip_trunk_optical_levels_post(subscription: Iptrunk) -> LSOState:
"""Check Optical POST levels on the trunk."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "optical_post"}
......
......@@ -20,7 +20,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_blocks.iptrunk import IptrunkSideBlock
from gso.products.product_types.iptrunk import Iptrunk
from gso.services import infoblox
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.helpers import get_router_vendor
from gso.utils.shared_enums import Vendor
......@@ -51,7 +51,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("[DRY RUN] Deprovision IP trunk")
def deprovision_ip_trunk_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
def deprovision_ip_trunk_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Perform a dry run of deleting configuration from the routers."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......@@ -77,7 +77,7 @@ def deprovision_ip_trunk_dry(subscription: Iptrunk, process_id: UUIDstr, tt_numb
@step("[FOR REAL] Deprovision IP trunk")
def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State:
def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Delete configuration from the routers."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
......
......@@ -8,18 +8,17 @@ from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, conditional, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import State
from gso.products.product_types.iptrunk import Iptrunk
from gso.services import infoblox
from gso.services.lso_client import anonymous_lso_interaction
from gso.services.lso_client import LSOState, anonymous_lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.helpers import get_router_vendor
from gso.utils.shared_enums import Vendor
@step("Validate IP trunk configuration")
def validate_router_config(subscription: Iptrunk) -> State:
def validate_router_config(subscription: Iptrunk) -> LSOState:
"""Run an Ansible playbook that validates the configuration that is present on an active IP trunk."""
extra_vars = {"wfo_trunk_json": json.loads(json_dumps(subscription)), "verb": "validate"}
......@@ -135,7 +134,7 @@ def verify_netbox_entries(subscription: Iptrunk) -> None:
@step("Verify configuration of IPtrunk")
def verify_iptrunk_config(subscription: Iptrunk) -> State:
def verify_iptrunk_config(subscription: Iptrunk) -> LSOState:
"""Check for configuration drift on the relevant routers."""
return {
"playbook_name": "iptrunks.yaml",
......@@ -158,7 +157,7 @@ def verify_iptrunk_config(subscription: Iptrunk) -> State:
@step("Check ISIS configuration")
def check_ip_trunk_isis(subscription: Iptrunk) -> State:
def check_ip_trunk_isis(subscription: Iptrunk) -> LSOState:
"""Run an Ansible playbook to check for any :term:`ISIS` configuration drift."""
return {
"playbook_name": "iptrunks.yaml",
......@@ -181,7 +180,7 @@ def check_ip_trunk_isis(subscription: Iptrunk) -> State:
@step("Verify TWAMP configuration")
def verify_twamp_config(subscription: Iptrunk) -> State:
def verify_twamp_config(subscription: Iptrunk) -> LSOState:
"""Check for configuration drift of TWAMP."""
return {
"playbook_name": "deploy_twamp.yaml",
......
......@@ -18,7 +18,7 @@ from pydantic import ConfigDict, model_validator
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services.kentik_client import KentikClient, NewKentikDevice
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.subscriptions import get_all_active_sites
from gso.utils.helpers import generate_inventory_for_active_routers
from gso.utils.shared_enums import Vendor
......@@ -48,7 +48,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("Evacuate the router by setting isis_overload")
def set_isis_overload(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def set_isis_overload(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Evacuate the router by setting isis overload."""
extra_vars = {
"dry_run": False,
......@@ -65,7 +65,7 @@ def set_isis_overload(subscription: dict[str, Any], tt_number: str, process_id:
@step("[DRY RUN] Deploy PE base config")
def deploy_pe_base_config_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def deploy_pe_base_config_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of adding the base config to the router."""
extra_vars = {
"dry_run": True,
......@@ -84,7 +84,7 @@ def deploy_pe_base_config_dry(subscription: dict[str, Any], tt_number: str, proc
@step("[FOR REAL] Deploy PE base config")
def deploy_pe_base_config_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def deploy_pe_base_config_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of adding the base config to the router."""
extra_vars = {
"dry_run": False,
......@@ -151,7 +151,7 @@ def create_kentik_device(subscription: Router) -> State:
@step("[DRY RUN] Include new PE into SDP mesh on other Nokia PEs")
def update_sdp_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def update_sdp_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run for updating the SDP mesh with the new router."""
extra_vars = {
"dry_run": True,
......@@ -175,7 +175,7 @@ def update_sdp_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id
@step("[FOR REAL] Include new PE into SDP mesh on other Nokia PEs")
def update_sdp_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def update_sdp_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Update the SDP mesh for L2 circuits(epipes) config on PE NOKIA routers."""
extra_vars = {
"dry_run": False,
......@@ -199,7 +199,7 @@ def update_sdp_mesh_real(subscription: dict[str, Any], tt_number: str, process_i
@step("[DRY RUN] Remove P from all PEs")
def remove_p_from_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def remove_p_from_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of removing the P router from all the PE routers."""
extra_vars = {
"dry_run": True,
......@@ -217,7 +217,7 @@ def remove_p_from_pe_dry(subscription: dict[str, Any], tt_number: str, process_i
@step("[FOR REAL] Remove P from all PEs")
def remove_p_from_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def remove_p_from_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Remove the P router from all the PE routers."""
extra_vars = {
"dry_run": False,
......@@ -235,7 +235,7 @@ def remove_p_from_pe_real(subscription: dict[str, Any], tt_number: str, process_
@step("[DRY RUN] Add PE mesh to PE")
def add_pe_mesh_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_pe_mesh_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of adding list of PE routers into iGEANT/iGEANT6 of promoted router."""
extra_vars = {
"dry_run": True,
......@@ -254,7 +254,7 @@ def add_pe_mesh_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_
@step("[FOR REAL] Add PE mesh to PE")
def add_pe_mesh_to_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_pe_mesh_to_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of adding list of PE routers into iGEANT/iGEANT6 of promoted router."""
extra_vars = {
"dry_run": False,
......@@ -273,7 +273,7 @@ def add_pe_mesh_to_pe_real(subscription: dict[str, Any], tt_number: str, process
@step("[DRY RUN] Add PE to PE mesh")
def add_pe_to_pe_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_pe_to_pe_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of adding the promoted router to all PE routers in iGEANT/iGEANT6."""
extra_vars = {
"dry_run": True,
......@@ -291,7 +291,7 @@ def add_pe_to_pe_mesh_dry(subscription: dict[str, Any], tt_number: str, process_
@step("[FOR REAL] Add PE to PE mesh")
def add_pe_to_pe_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_pe_to_pe_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of adding the promoted router to all PE routers in iGEANT/iGEANT6."""
extra_vars = {
"dry_run": False,
......@@ -309,7 +309,7 @@ def add_pe_to_pe_mesh_real(subscription: dict[str, Any], tt_number: str, process
@step("Check iBGP session")
def check_pe_ibgp(subscription: dict[str, Any]) -> State:
def check_pe_ibgp(subscription: dict[str, Any]) -> LSOState:
"""Check the iBGP session."""
extra_vars = {
"dry_run": False,
......@@ -325,7 +325,7 @@ def check_pe_ibgp(subscription: dict[str, Any]) -> State:
@step("[DRY RUN] Deploy routing instances")
def deploy_routing_instances_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def deploy_routing_instances_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of deploying routing instances."""
extra_vars = {
"dry_run": True,
......@@ -342,7 +342,7 @@ def deploy_routing_instances_dry(subscription: dict[str, Any], tt_number: str, p
@step("[FOR REAL] Deploy routing instances")
def deploy_routing_instances_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def deploy_routing_instances_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of deploying routing instances."""
extra_vars = {
"dry_run": False,
......@@ -359,7 +359,7 @@ def deploy_routing_instances_real(subscription: dict[str, Any], tt_number: str,
@step("Check L3 services")
def check_l3_services(subscription: dict[str, Any]) -> State:
def check_l3_services(subscription: dict[str, Any]) -> LSOState:
"""Check L3 services."""
extra_vars = {
"dry_run": False,
......@@ -375,7 +375,7 @@ def check_l3_services(subscription: dict[str, Any]) -> State:
@step("Remove ISIS overload")
def remove_isis_overload(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def remove_isis_overload(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Remove ISIS overload."""
extra_vars = {
"dry_run": False,
......@@ -400,7 +400,7 @@ def update_subscription_model(subscription: Router) -> State:
@step("[DRY RUN] Add all P to this new PE")
def add_all_p_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_all_p_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of adding all P routers to the PE router."""
extra_vars = {
"dry_run": True,
......@@ -420,7 +420,7 @@ def add_all_p_to_pe_dry(subscription: dict[str, Any], tt_number: str, process_id
@step("[FOR REAL] Add all P to this new PE")
def add_all_p_to_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_all_p_to_pe_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of adding all P routers to the PE router."""
extra_vars = {
"dry_run": False,
......@@ -440,7 +440,7 @@ def add_all_p_to_pe_real(subscription: dict[str, Any], tt_number: str, process_i
@step("[DRY RUN] Add this new PE to all P")
def add_pe_to_all_p_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_pe_to_all_p_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of adding promoted router to all PE routers in iGEANT/iGEANT6."""
extra_vars = {
"dry_run": True,
......@@ -460,7 +460,7 @@ def add_pe_to_all_p_dry(subscription: dict[str, Any], tt_number: str, process_id
@step("[FOR REAL] Add this new PE to all P")
def add_pe_to_all_p_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_pe_to_all_p_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of adding promoted router to all PE routers in iGEANT/iGEANT6."""
extra_vars = {
"dry_run": False,
......@@ -480,7 +480,7 @@ def add_pe_to_all_p_real(subscription: dict[str, Any], tt_number: str, process_i
@step("[DRY RUN] Delete default routes")
def delete_default_routes_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def delete_default_routes_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of deleting the default routes."""
extra_vars = {
"dry_run": True,
......@@ -498,7 +498,7 @@ def delete_default_routes_dry(subscription: dict[str, Any], tt_number: str, proc
@step("[FOR REAL] Delete default routes")
def delete_default_routes_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def delete_default_routes_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of deleting the default routes."""
extra_vars = {
"dry_run": False,
......
......@@ -19,14 +19,13 @@ from orchestrator.workflows.steps import (
unsync,
)
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import State
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import infoblox
from gso.services.kentik_client import KentikClient
from gso.services.librenms_client import LibreNMSClient
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.settings import load_oss_params
from gso.utils.helpers import generate_inventory_for_active_routers
......@@ -70,7 +69,7 @@ def deprovision_loopback_ips(subscription: Router) -> dict:
@step("[DRY RUN] Remove configuration from router")
def remove_config_from_router_dry(subscription: Router, process_id: UUIDstr, tt_number: str) -> State:
def remove_config_from_router_dry(subscription: Router, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Remove configuration from the router, first as a dry run."""
extra_vars = {
"wfo_router_json": json.loads(json_dumps(subscription)),
......@@ -88,7 +87,7 @@ def remove_config_from_router_dry(subscription: Router, process_id: UUIDstr, tt_
@step("[FOR REAL] Remove configuration from router")
def remove_config_from_router_real(subscription: Router, process_id: UUIDstr, tt_number: str) -> State:
def remove_config_from_router_real(subscription: Router, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Remove configuration from the router."""
extra_vars = {
"wfo_router_json": json.loads(json_dumps(subscription)),
......@@ -113,7 +112,7 @@ def remove_device_from_netbox(subscription: Router) -> dict[str, Router]:
@step("[DRY RUN] Remove P router from all the PE routers")
def remove_p_from_all_pe_dry(subscription: Router, tt_number: str, process_id: UUIDstr) -> State:
def remove_p_from_all_pe_dry(subscription: Router, tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of removing the terminated router from all the PE routers."""
extra_vars = {
"dry_run": True,
......@@ -131,7 +130,7 @@ def remove_p_from_all_pe_dry(subscription: Router, tt_number: str, process_id: U
@step("[REAL RUN] Remove P router from all the PE routers")
def remove_p_from_all_pe_real(subscription: Router, tt_number: str, process_id: UUIDstr) -> State:
def remove_p_from_all_pe_real(subscription: Router, tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of removing the terminated router from all the PE routers."""
extra_vars = {
"dry_run": False,
......@@ -149,7 +148,7 @@ def remove_p_from_all_pe_real(subscription: Router, tt_number: str, process_id:
@step("[DRY RUN] Remove PE router from all the PE routers")
def remove_pe_from_all_pe_dry(subscription: Router, tt_number: str, process_id: UUIDstr) -> State:
def remove_pe_from_all_pe_dry(subscription: Router, tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of removing the terminated PE router from the PE router mesh."""
extra_vars = {
"dry_run": True,
......@@ -169,7 +168,7 @@ def remove_pe_from_all_pe_dry(subscription: Router, tt_number: str, process_id:
@step("[REAL RUN] Remove all PE routers from all the PE routers")
def remove_pe_from_all_pe_real(subscription: Router, tt_number: str, process_id: UUIDstr) -> State:
def remove_pe_from_all_pe_real(subscription: Router, tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of removing terminated PE router from PE the router mesh."""
extra_vars = {
"dry_run": False,
......@@ -189,7 +188,7 @@ def remove_pe_from_all_pe_real(subscription: Router, tt_number: str, process_id:
@step("[DRY RUN] Remove PE router from all the P routers")
def remove_pe_from_all_p_dry(subscription: Router, tt_number: str, process_id: UUIDstr) -> State:
def remove_pe_from_all_p_dry(subscription: Router, tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of removing PE router from all P routers."""
extra_vars = {
"dry_run": True,
......@@ -207,7 +206,7 @@ def remove_pe_from_all_p_dry(subscription: Router, tt_number: str, process_id: U
@step("[REAL RUN] Remove PE router from all P routers")
def remove_pe_from_all_p_real(subscription: Router, tt_number: str, process_id: UUIDstr) -> State:
def remove_pe_from_all_p_real(subscription: Router, tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a real run of removing PE router from all P routers."""
extra_vars = {
"dry_run": False,
......
......@@ -15,7 +15,7 @@ from pydantic import ConfigDict, model_validator
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import librenms_client
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.subscriptions import get_trunks_that_terminate_on_router
from gso.utils.helpers import generate_inventory_for_active_routers
from gso.utils.types.snmp import SNMPVersion
......@@ -52,7 +52,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("[DRY RUN] Add P router to iBGP mesh")
def add_p_to_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_p_to_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Perform a dry run of adding the new P router to the PE router mesh."""
extra_vars = {
"dry_run": True,
......@@ -69,7 +69,7 @@ def add_p_to_mesh_dry(subscription: dict[str, Any], tt_number: str, process_id:
@step("[FOR REAL] Add P router to iBGP mesh")
def add_p_to_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_p_to_mesh_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Add the P router to the mesh of PE routers."""
extra_vars = {
"dry_run": False,
......@@ -86,7 +86,7 @@ def add_p_to_mesh_real(subscription: dict[str, Any], tt_number: str, process_id:
@step("[DRY RUN] Add all PE routers to P router iBGP group")
def add_all_pe_to_p_dry(subscription: dict[str, Any]) -> State:
def add_all_pe_to_p_dry(subscription: dict[str, Any]) -> LSOState:
"""Perform a dry run of adding the list of all PE routers to the new P router."""
extra_vars = {
"dry_run": True,
......@@ -103,7 +103,7 @@ def add_all_pe_to_p_dry(subscription: dict[str, Any]) -> State:
@step("[FOR REAL] Add all PE routers to P router iBGP group")
def add_all_pe_to_p_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> State:
def add_all_pe_to_p_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Add the list of all PE routers to the new P router."""
extra_vars = {
"dry_run": False,
......@@ -121,7 +121,7 @@ def add_all_pe_to_p_real(subscription: dict[str, Any], tt_number: str, process_i
@step("Verify iBGP session health")
def check_ibgp_session(subscription: Router) -> State:
def check_ibgp_session(subscription: Router) -> LSOState:
"""Run a playbook using the provisioning proxy, to check the health of the new iBGP session."""
return {
"playbook_name": "check_ibgp.yaml",
......
......@@ -14,7 +14,7 @@ from gso.products.product_types.router import Router
from gso.services import infoblox
from gso.services.kentik_client import KentikClient
from gso.services.librenms_client import LibreNMSClient
from gso.services.lso_client import anonymous_lso_interaction
from gso.services.lso_client import LSOState, anonymous_lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.helpers import generate_inventory_for_active_routers
from gso.utils.shared_enums import Vendor
......@@ -53,7 +53,7 @@ def check_netbox_entry_exists(subscription: Router) -> None:
@step("Verify BGP configuration on P router")
def verify_p_ibgp(subscription: dict[str, Any]) -> State:
def verify_p_ibgp(subscription: dict[str, Any]) -> LSOState:
"""Perform a dry run of adding the list of all PE routers to the new P router."""
extra_vars = {
"dry_run": True,
......@@ -97,7 +97,7 @@ def check_kentik_entry_exists(subscription: Router) -> None:
@step("Check base config for drift")
def verify_base_config(subscription: dict[str, Any]) -> State:
def verify_base_config(subscription: dict[str, Any]) -> LSOState:
"""Workflow step for running a playbook that checks whether base config has drifted."""
return {
"playbook_name": "base_config.yaml",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment