Newer
Older
"""The Provisioning Proxy service, which interacts with LSO running externally.
LSO is responsible for executing Ansible playbooks, that deploy subscriptions.
"""
Karel van Klink
committed
from typing import NoReturn
from orchestrator.config.assignee import Assignee
from orchestrator.domain import SubscriptionModel
from orchestrator.forms import FormPage, ReadOnlyField
from orchestrator.forms.validators import Accept, Label, LongText
from orchestrator.types import FormGenerator, State, UUIDstr, strEnum
from orchestrator.utils.json import json_dumps
Karel van Klink
committed
from orchestrator.workflow import Step, StepList
Karel van Klink
committed
from pydantic import validator
from gso import settings
from gso.products.product_types.device import DeviceProvisioning
from gso.products.product_types.iptrunk import Iptrunk, IptrunkProvisioning
Karel van Klink
committed
DEFAULT_LABEL = "Provisioning proxy is running. Please come back later for the results."
"""Enumerator for different C(R)UD operations that the provisioning proxy supports.
Read isn't applicable, hence these become CUD and not CRUD operations.
def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation) -> None:
"""Send a request to LSO. The callback address is derived using the process ID provided.
:param str endpoint: The LSO-specific endpoint to call, depending on the type of service object that's acted upon.
:param dict parameters: JSON body for the request, which will almost always at least consist of a subscription
object, and a boolean value to indicate a dry run.
:param UUIDstr process_id: The process ID that this request is a part of, used to call back to when the execution
of the playbook is completed.
:param :class:`CUDOperation` operation: The specific operation that's performed with the request.
oss = settings.load_oss_params()
pp_params = oss.PROVISIONING_PROXY
assert pp_params
callback_url = f"{settings.load_oss_params().GENERAL.public_hostname}" f"/api/processes/{process_id}/resume"
logger.debug("[provisioning proxy] provisioning for process %s", process_id)
parameters.update({"callback": callback_url})
url = f"{pp_params.scheme}://{pp_params.api_base}/api/{endpoint}"
request = requests.post(url, json=parameters, timeout=10000)
request = requests.put(url, json=parameters, timeout=10000)
request = requests.delete(url, json=parameters, timeout=10000)
raise AssertionError(request.content)
def provision_device(subscription: DeviceProvisioning, process_id: UUIDstr, dry_run: bool = True) -> None:
:param :class:`DeviceProvisioning` subscription: The subscription object that's to be provisioned.
:param UUIDstr process_id: The related process ID, used for callback.
:param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``.
parameters = {"dry_run": dry_run, "subscription": json.loads(json_dumps(subscription))}
_send_request("device", parameters, process_id, CUDOperation.POST)
def provision_ip_trunk(
subscription: IptrunkProvisioning, process_id: UUIDstr, config_object: str, dry_run: bool = True
"""Provision an IP trunk service using LSO.
:param :class:`IptrunkProvisioning` subscription: The subscription object that's to be provisioned.
:param UUIDstr process_id: The related process ID, used for callback.
:param str config_object: The type of object that's deployed
:param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``.
"subscription": json.loads(json_dumps(subscription)),
"dry_run": dry_run,
"verb": "deploy",
"object": config_object,
_send_request("ip_trunk", parameters, process_id, CUDOperation.POST)
# new_subscription: Iptrunk,
# process_id: UUIDstr,
# dry_run: bool = True):
# """
# Function that modifies an existing IP trunk subscription using LSO.
#
# :param :class:`Iptrunk` old_subscription: The subscription object, before
# its modification.
# :param :class:`Iptrunk` new_subscription: The subscription object, after
# modifications have been made to it.
# :param UUIDstr process_id: The related process ID, used for callback.
# :param bool dry_run: A boolean indicating whether this should be a dry ryn
# or not, defaults to ``True``.
# """
# parameters = {
# 'dry_run': dry_run,
# 'old_subscription': old_subscription,
# 'subscription': new_subscription
# }
#
# _send_request('ip_trunk', parameters, process_id, CUDOperation.PUT)
def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, dry_run: bool = True) -> None:
"""Deprovision an IP trunk service using LSO.
:param :class:`IptrunkProvisioning` subscription: The subscription object that's to be provisioned.
:param UUIDstr process_id: The related process ID, used for callback.
:param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``.
parameters = {"subscription": json.loads(json_dumps(subscription)), "dry_run": dry_run, "verb": "terminate"}
_send_request("ip_trunk", parameters, process_id, CUDOperation.DELETE)
@inputstep("Await provisioning proxy results", assignee=Assignee("SYSTEM"))
Karel van Klink
committed
def await_pp_results(subscription: SubscriptionModel, label_text: str = DEFAULT_LABEL) -> FormGenerator:
class ProvisioningResultPage(FormPage):
class Config:
title = f"Deploying {subscription.product.name}..."
warning_label: Label = label_text # type: ignore
pp_run_results: dict = None # type: ignore
@validator("pp_run_results", allow_reuse=True, pre=True, always=True)
Karel van Klink
committed
def run_results_must_be_given(cls, run_results: dict) -> dict | NoReturn:
Karel van Klink
committed
if run_results is None:
raise ValueError("Run results may not be empty. Wait for the provisioning proxy to finish.")
Karel van Klink
committed
return run_results
result_page = yield ProvisioningResultPage
return result_page.dict()
Karel van Klink
committed
@step("Reset Provisioning Proxy state")
def reset_pp_success_state() -> State:
return {"pp_did_succeed": False}
@inputstep("Confirm provisioning proxy results", assignee=Assignee("SYSTEM"))
def confirm_pp_results(state: State) -> FormGenerator:
Karel van Klink
committed
successful_run = state["pp_run_results"]["return_code"] == 0
class ConfirmRunPage(FormPage):
class Config:
title = (
f"Execution for "
f"{state['subscription']['product']['name']} "
f"completed, please confirm the results below."
)
run_status: str = ReadOnlyField(state["pp_run_results"]["status"])
run_results: LongText = ReadOnlyField(f"{state['pp_run_results']['output']}")
Karel van Klink
committed
if not successful_run:
pp_retry_label: Label = (
"Provisioning Proxy playbook execution failed, it will be retried (up to two times)." # type: ignore
)
Karel van Klink
committed
yield ConfirmRunPage
return {"pp_did_succeed": successful_run}
def pp_interaction(provisioning_step: Step) -> StepList:
should_retry_pp_steps = conditional(lambda state: not state.get("pp_did_succeed"))
Karel van Klink
committed
return (
should_retry_pp_steps(provisioning_step)
>> should_retry_pp_steps(await_pp_results)
>> should_retry_pp_steps(confirm_pp_results)
)