Skip to content
Snippets Groups Projects
Verified Commit a8f77fa7 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

add base config redeploy workflow

parent 4c9ed628
No related branches found
No related tags found
1 merge request!130Add redeploy base config workflow
Pipeline #85213 passed
"""Add base config redeployment workflow.
Revision ID: b689d4636694
Revises: 815033570ad7
Create Date: 2023-12-27 15:20:40.522053
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'b689d4636694'
down_revision = '815033570ad7'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "redeploy_base_config",
"target": "MODIFY",
"description": "Redeploy base config",
"product_type": "Router"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
......@@ -36,9 +36,10 @@
}
},
"workflow": {
"modify_isis_metric": "Modify the ISIS metric",
"modify_trunk_interface": "Modify IP Trunk interface",
"migrate_iptrunk": "Migrate IP Trunk",
"confirm_info": "Please verify this form looks correct."
}
"confirm_info": "Please verify this form looks correct.",
"migrate_iptrunk": "Migrate IP Trunk",
"modify_isis_metric": "Modify the ISIS metric",
"modify_trunk_interface": "Modify IP Trunk interface",
"redeploy_base_config": "Redeploy base config"
}
}
......@@ -8,6 +8,7 @@ LazyWorkflowInstance("gso.workflows.iptrunk.modify_trunk_interface", "modify_tru
LazyWorkflowInstance("gso.workflows.iptrunk.migrate_iptrunk", "migrate_iptrunk")
LazyWorkflowInstance("gso.workflows.iptrunk.terminate_iptrunk", "terminate_iptrunk")
LazyWorkflowInstance("gso.workflows.router.create_router", "create_router")
LazyWorkflowInstance("gso.workflows.router.redeploy_base_config", "redeploy_base_config")
LazyWorkflowInstance("gso.workflows.router.terminate_router", "terminate_router")
LazyWorkflowInstance("gso.workflows.site.create_site", "create_site")
LazyWorkflowInstance("gso.workflows.site.modify_site", "modify_site")
......
"""A workflow that re-deploys base config on a router."""
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.workflow import StepList, done, init, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.router import Router
from gso.services.provisioning_proxy import pp_interaction
from gso.utils.workflow_steps import deploy_base_config_dry, deploy_base_config_real
def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator:
router = Router.from_subscription(subscription_id)
class RedeployBaseConfigForm(FormPage):
info_label: Label = f"Redeploy base config on {router.router.router_fqdn}?" # type: ignore[assignment]
tt_number: str
user_input = yield RedeployBaseConfigForm
return user_input.dict() | {"subscription": router}
@workflow(
"Redeploy base config",
initial_input_form=wrap_modify_initial_input_form(_initial_input_form),
target=Target.MODIFY,
)
def redeploy_base_config() -> StepList:
"""Redeploy base config on an existing router.
* Perform a dry run of deployment
* Redeploy base config
"""
return (
init
>> store_process_subscription(Target.MODIFY)
>> unsync
>> pp_interaction(deploy_base_config_dry)
>> pp_interaction(deploy_base_config_real)
>> resync
>> done
)
from unittest.mock import patch
import pytest
from gso.products import Router
from test.workflows import (
assert_complete,
assert_pp_interaction_success,
extract_state,
run_workflow,
)
@pytest.mark.workflow()
@patch("gso.services.provisioning_proxy._send_request")
def test_redeploy_base_config_success(
mock_provision_router,
nokia_router_subscription_factory,
faker,
):
# Set up mock return values
product_id = nokia_router_subscription_factory()
# Run workflow
initial_input_data = [{"subscription_id": product_id}, {"tt_number": faker.tt_number()}]
result, process_stat, step_log = run_workflow("redeploy_base_config", initial_input_data)
for _ in range(2):
result, step_log = assert_pp_interaction_success(result, process_stat, step_log)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Router.from_subscription(subscription_id)
assert subscription.status == "active"
assert mock_provision_router.call_count == 2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment