Skip to content
Snippets Groups Projects
Commit 5f0d59e9 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add modification workflow for routers to change Kentik license

parent 4285b1e1
No related branches found
No related tags found
1 merge request!257Feature/add kentik workflow
"""Add router modification workflow for kentik licenses.
Revision ID: 87a05eddee3e
Revises: 41fd1ae225aq
Create Date: 2024-08-02 15:09:42.597063
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '87a05eddee3e'
down_revision = '41fd1ae225aq'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "modify_router_kentik_license",
"target": "MODIFY",
"description": "Modify Kentik license",
"product_type": "Router"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
......@@ -46,6 +46,7 @@ LazyWorkflowInstance("gso.workflows.router.import_router", "import_router")
LazyWorkflowInstance("gso.workflows.router.create_imported_router", "create_imported_router")
LazyWorkflowInstance("gso.workflows.router.validate_router", "validate_router")
LazyWorkflowInstance("gso.workflows.router.promote_p_to_pe", "promote_p_to_pe")
LazyWorkflowInstance("gso.workflows.router.modify_kentik_license", "modify_router_kentik_license")
# Site workflows
LazyWorkflowInstance("gso.workflows.site.create_site", "create_site")
......
"""A workflow that modifies the Kentik license of a router."""
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.validators import Choice
from gso.products.product_types.router import Router
from gso.services.kentik_client import KentikClient
def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator:
router = Router.from_subscription(subscription_id)
class ModifyKentikLicenseForm(FormPage):
@staticmethod
def _get_available_plans() -> list[dict[str, str]]:
kentik_plans = KentikClient().get_plans()
return [{plan["name"]: plan["id"]} for plan in kentik_plans]
new_plan_id = Choice("Select a Kentik license", _get_available_plans())
user_input = yield ModifyKentikLicenseForm
return user_input.model_dump() | {"subscription": router}
@step("Update device license in Kentik")
def update_kentik_license(subscription: Router, new_plan_id: int) -> State:
"""Update a Kentik device with a new license attached to it."""
kentik_client = KentikClient()
kentik_device = kentik_client.get_device_by_name(subscription.router.router_fqdn)
if "id" not in kentik_device:
msg = "Failed to find Kentik device by name"
raise ProcessFailureError(msg, details=kentik_device)
updated_kentik_device = kentik_client.update_device(kentik_device["id"], {"plan_id": new_plan_id})
updated_kentik_device.pop("custom_column_data", None)
return {"kentik_device": updated_kentik_device}
@workflow(
"Modify Kentik license",
initial_input_form=wrap_modify_initial_input_form(_initial_input_form),
target=Target.MODIFY,
)
def modify_router_kentik_license() -> StepList:
"""Apply a selected Kentik license on an existing PE router."""
return begin >> store_process_subscription(Target.MODIFY) >> unsync >> update_kentik_license >> resync >> done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment