Skip to content
Snippets Groups Projects
Commit 955fdd15 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

Leverage start_process with Celery to launch bulk workflows and handle any missing scenarios

parent 13b389af
No related branches found
No related tags found
1 merge request!429Feature/mass base config redeploy
......@@ -56,7 +56,7 @@ def _send_request(parameters: dict, callback_route: str) -> None:
logger.debug(debug_msg)
parameters.update({"callback": callback_url})
url = f"{params.scheme}://{params.api_base}/api/playbook"
url = f"{params.scheme}://{params.api_base}/api/playbook/"
request = requests.post(url, json=parameters, timeout=10)
request.raise_for_status()
......
......@@ -4,11 +4,13 @@ This prevents someone from having to re-write database statements many times, th
or inconsistent when not careful. These methods are related to operations regarding processes and workflows.
"""
from orchestrator.db import ProcessTable, WorkflowTable, db
from orchestrator.workflow import ProcessStatus
from uuid import UUID
from orchestrator.db import ProcessStepTable, ProcessSubscriptionTable, ProcessTable, WorkflowTable, db
from orchestrator.workflow import ProcessStatus, StepStatus
from pydantic_forms.types import UUIDstr
from sqlalchemy import ScalarResult, and_, or_, select
from sqlalchemy.orm import Query
from sqlalchemy.orm import Query, joinedload
def get_processes_by_workflow_name(workflow_name: str) -> Query:
......@@ -65,3 +67,21 @@ def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarRes
)
)
)
def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None:
"""Get a stopped process by its ID."""
return (
db.session.query(ProcessTable)
.join(ProcessTable.steps)
.filter(
ProcessTable.process_id == process_id,
ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}),
)
.options(
joinedload(ProcessTable.steps),
joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription),
)
.order_by(ProcessTable.last_modified_at)
.one_or_none()
)
"""Massive redeploy base config task for routers."""
import logging
import requests
from celery import shared_task
from orchestrator.services.processes import start_process
from orchestrator.workflow import ProcessStatus
from pydantic_forms.exceptions import FormValidationError
from pydantic_forms.types import UUIDstr
from gso import settings
from gso.products.product_types.router import Router
from gso.utils.helpers import wait_for_workflow_to_stop
from gso.utils.types.tt_number import TTNumber
logger = logging.getLogger(__name__)
@shared_task
def massive_redeploy_base_config_task(
selected_routers: list[UUIDstr], tt_number: TTNumber, callback_route: str
) -> None:
"""Massive redeploy base config task for routers."""
failed_wfs: dict[str, str] = {}
successful_wfs: dict[str, str] = {}
for router_id in selected_routers:
fqdn = Router.from_subscription(router_id).router.router_fqdn
try:
process_id = start_process(
"redeploy_base_config",
user_inputs=[{"subscription_id": router_id}, {"tt_number": tt_number, "is_massive_redeploy": True}],
)
process = wait_for_workflow_to_stop(process_id, check_interval=5, max_retries=60)
if not process:
failed_wfs[fqdn] = "Timed out waiting for workflow to complete"
continue
if process.last_step == "Done" and process.last_status == ProcessStatus.COMPLETED:
successful_wfs[fqdn] = "Done"
elif process.last_status == ProcessStatus.ABORTED:
failed_wfs[fqdn] = "Workflow was aborted"
elif process.last_status == ProcessStatus.FAILED:
failed_wfs[fqdn] = process.failed_reason or "Workflow failed without a reason"
else:
failed_wfs[fqdn] = f"Workflow status: {process.last_status}, last step: {process.last_step}"
except FormValidationError as e:
failed_wfs[fqdn] = f"Validation error: {e}"
except Exception as e: # noqa: BLE001
failed_wfs[fqdn] = f"Unexpected error: {e}"
oss = settings.load_oss_params()
callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}"
msg = f"[provisioning proxy] Callback URL set to {callback_url}"
logger.debug(msg)
payload = {
"failed_wfs": failed_wfs,
"successful_wfs": successful_wfs,
}
try:
response = requests.post(callback_url, json=payload, timeout=30)
if not response.ok:
logger.exception(
"Callback failed",
extra={
"status_code": response.status_code,
"response_text": response.text,
"callback_url": callback_url,
"failed_wfs": failed_wfs,
"selected_routers": selected_routers,
},
)
except Exception as e:
msg = f"Failed to post callback: {e}"
logger.exception(msg)
"""Helper methods that are used across GSO."""
import logging
import random
import re
import time
from ipaddress import IPv4Network, IPv6Network
from typing import TYPE_CHECKING, TypeAlias, cast
from uuid import UUID
from orchestrator.db import ProcessTable
from orchestrator.types import SubscriptionLifecycle
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice
......@@ -15,6 +18,7 @@ from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_all_partners
from gso.services.processes import get_stopped_process_by_id
from gso.services.subscriptions import (
get_active_edge_port_subscriptions,
get_active_router_subscriptions,
......@@ -30,6 +34,8 @@ from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType, IPv4NetworkType, IPv6NetworkType
from gso.utils.types.virtual_identifiers import VC_ID
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
......@@ -395,3 +401,32 @@ def generate_unique_vc_id(l2c_type: str, max_attempts: int = 100) -> VC_ID | Non
return VC_ID(vc_id)
return None
def wait_for_workflow_to_stop(
process_id: UUIDstr | UUID,
check_interval: int,
max_retries: int,
) -> ProcessTable | None:
"""Waits until any step in the workflow reaches a terminal status.
Times out after max_retries * check_interval seconds.
:param process_id: ID of the workflow process
:param check_interval: Seconds between checks
:param max_retries: Max number of retries before giving up
:return: process object if it has stopped, None if it timed out
"""
for attempt in range(max_retries):
if process := get_stopped_process_by_id(process_id):
msg = f"✅ Process {process_id} has stopped with status: {process.last_status}"
logger.info(msg)
return process
msg = f"⏳ Attempt {attempt + 1}/{max_retries}: Waiting for workflow to progress..."
logger.info(msg)
time.sleep(check_interval)
msg = f"❌ Timeout reached. Workflow {process_id} did not stop after {max_retries * check_interval} seconds."
logger.error(msg)
return None
......@@ -84,6 +84,7 @@ celery = OrchestratorWorker(
"gso.schedules.clean_old_tasks",
"orchestrator.services.tasks",
"gso.tasks.start_process",
"gso.tasks.massive_redeploy_base_config",
],
)
......
......@@ -17,7 +17,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import FormGenerator, UUIDstr
from gso.products.product_types.router import Router
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import anonymous_lso_interaction, lso_interaction
from gso.utils.types.tt_number import TTNumber
from gso.utils.workflow_steps import deploy_base_config_dry, deploy_base_config_real
......@@ -47,13 +47,15 @@ def redeploy_base_config() -> StepList:
* Redeploy base config
"""
is_not_massive_redeploy = conditional(lambda state: not bool(state.get("is_massive_redeploy")))
is_massive_redeploy = conditional(lambda state: bool(state.get("is_massive_redeploy")))
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> is_not_massive_redeploy(lso_interaction(deploy_base_config_dry))
>> lso_interaction(deploy_base_config_real)
>> is_not_massive_redeploy(lso_interaction(deploy_base_config_real))
>> is_massive_redeploy(anonymous_lso_interaction(deploy_base_config_real))
>> resync
>> done
)
......@@ -7,19 +7,18 @@ input by the operator. The operator can then
import json
from typing import Annotated
import requests
from annotated_types import Len
from orchestrator.config.assignee import Assignee
from orchestrator.forms import SubmitFormPage
from orchestrator.targets import Target
from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow
from orchestrator.workflow import StepList, callback_step, conditional, done, init, inputstep, step, workflow
from pydantic import AfterValidator, ConfigDict
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import LongText, validate_unique_list
from requests import HTTPError
from gso.products.product_types.router import Router
from gso.services.subscriptions import get_active_router_subscriptions
from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task
from gso.utils.helpers import active_nokia_router_selector
from gso.utils.shared_enums import Vendor
from gso.utils.types.tt_number import TTNumber
......@@ -45,40 +44,34 @@ def _input_form_generator() -> FormGenerator:
selected_routers: router_selection_list = all_active_nokia_routers # type: ignore[valid-type]
user_input = yield RedeployBaseConfigForm
return user_input.model_dump()
@step("Start running redeploy workflows on selected routers")
def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr]) -> State:
"""Loop over all selected routers, and try to start a new workflow for each of them."""
wf_errors = {}
workflow_url = "http://localhost:8080/api/processes/redeploy_base_config"
for selected_router in selected_routers:
try:
result = requests.post(
workflow_url,
json=[{"subscription_id": selected_router}, {"tt_number": tt_number, "is_massive_redeploy": True}],
timeout=10,
)
result.raise_for_status()
except HTTPError as e:
if e.response.json()["validation_errors"]:
error_message = e.response.json()["validation_errors"][0]["msg"]
else:
error_message = e.response.json()
wf_errors[Router.from_subscription(selected_router).router.router_fqdn] = error_message
return {"wf_errors": wf_errors}
@inputstep("Some workflows have failed to start", assignee=Assignee.SYSTEM)
def workflows_failed_to_start_prompt(wf_errors: list) -> FormGenerator:
return user_input.model_dump() | {"failed_wfs": {}, "successful_wfs": {}}
@step("Start worker to redeploy base config on selected routers")
def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr], callback_route: str) -> State:
"""Start the massive redeploy base config task with the selected routers."""
# TODO if in the future you changed UUIDstr to UUID, you need to convert them to string when passing to the task
massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5)
return {"failed_wfs": {}, "successful_wfs": {}}
@step("Evaluate provisioning proxy result")
def evaluate_results(callback_result: dict) -> State:
"""Evaluate the result of the provisioning proxy callback."""
failed_wfs = callback_result.pop("failed_wfs", {})
successful_wfs = callback_result.pop("successful_wfs", {})
return {"callback_result": callback_result, "failed_wfs": failed_wfs, "successful_wfs": successful_wfs}
@inputstep("Some workflows have failed", assignee=Assignee.SYSTEM)
def workflows_failed_to_prompt(failed_wfs: dict, successful_wfs: dict) -> FormGenerator:
"""Prompt the operator that some workflows have failed to start."""
class WFFailurePrompt(SubmitFormPage):
model_config = ConfigDict(title="Some redeploy workflows have failed to start, please inspect the list below")
failed_to_start: LongText = json.dumps(wf_errors, indent=4)
model_config = ConfigDict(title="Some redeploy workflows have failed, please inspect the list below")
failed_workflows: LongText = json.dumps(failed_wfs, indent=4)
successful_workflows: LongText = json.dumps(successful_wfs, indent=4)
yield WFFailurePrompt
return {}
......@@ -87,6 +80,15 @@ def workflows_failed_to_start_prompt(wf_errors: list) -> FormGenerator:
@workflow("Redeploy base config on multiple routers", initial_input_form=_input_form_generator, target=Target.SYSTEM)
def task_redeploy_base_config() -> StepList:
"""Gather a list of routers from the operator to redeploy base config onto."""
some_failed_to_start = conditional(lambda state: state["wf_errors"])
return init >> start_redeploy_workflows >> some_failed_to_start(workflows_failed_to_start_prompt) >> done
some_failed_to_start = conditional(lambda state: len(state.get("failed_wfs", {})) > 0)
return (
init
>> callback_step(
name="Start running redeploy workflows on selected routers",
action_step=start_redeploy_workflows,
validate_step=evaluate_results,
)
>> some_failed_to_start(workflows_failed_to_prompt)
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment