Skip to content
Snippets Groups Projects
provisioning_proxy.py 14.3 KiB
Newer Older
"""The Provisioning Proxy service, which interacts with {term}`LSO` running externally.
{term}`LSO` is responsible for executing Ansible playbooks, that deploy subscriptions.
import logging
Karel van Klink's avatar
Karel van Klink committed
from orchestrator import conditional, inputstep, step
from orchestrator.config.assignee import Assignee
from orchestrator.domain import SubscriptionModel
from orchestrator.forms import FormPage, ReadOnlyField
from orchestrator.forms.validators import Accept, Label, LongText
from orchestrator.types import FormGenerator, State, UUIDstr, strEnum
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import Step, StepList, abort
from gso.products.product_types.iptrunk import Iptrunk, IptrunkProvisioning
from gso.products.product_types.router import Router, RouterProvisioning
logger = logging.getLogger(__name__)
DEFAULT_LABEL = "Provisioning proxy is running. Please come back later for the results."
"""The default label displayed when the provisioning proxy is running, in case no custom label is provided."""
class CUDOperation(strEnum):
    """Enumerator for different {term}`CRUD` operations that the provisioning proxy supports.
    Read isn't applicable, hence the missing R.

    POST = "POST"
    PUT = "PUT"
    DELETE = "DELETE"
def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation) -> None:
    """Send a request to {term}`LSO`. The callback address is derived using the process ID provided.
    :param endpoint: The {term}`LSO`-specific endpoint to call, depending on the type of service object that's acted
        upon.
    :type endpoint: str
    :param parameters: JSON body for the request, which will almost always at least consist of a subscription object,
        and a boolean value to indicate a dry run.
    :type parameters: dict
    :param process_id: The process ID that this request is a part of, used to call back to when the execution of the
        playbook is completed.
    :type process_id: UUIDstr
    :param operation: The specific operation that's performed with the request.
    :type operation: {class}`CUDOperation`
    :rtype: None
    oss = settings.load_oss_params()
    pp_params = oss.PROVISIONING_PROXY
    assert pp_params
    # Build up a callback URL of the Provisioning Proxy to return its results to.
    callback_url = f"{settings.load_oss_params().GENERAL.public_hostname}" f"/api/processes/{process_id}/resume"
    logger.debug(f"[provisioning proxy] provisioning for process {process_id}")
    logger.debug(f"[provisioning proxy] Callback URL set to {callback_url}")
    parameters.update({"callback": callback_url})
    url = f"{pp_params.scheme}://{pp_params.api_base}/api/{endpoint}"

    request = None
    # Fire off the request, depending on the operation type.
    if operation == CUDOperation.POST:
        request = requests.post(url, json=parameters, timeout=10000)
    elif operation == CUDOperation.PUT:
        request = requests.put(url, json=parameters, timeout=10000)
    elif operation == CUDOperation.DELETE:
        request = requests.delete(url, json=parameters, timeout=10000)
    if request.status_code != 200:
        logger.debug(request.content)
        raise AssertionError(request.content)
def provision_router(subscription: RouterProvisioning, process_id: UUIDstr, dry_run: bool = True) -> None:
    """Provision a new router using {term}`LSO`.
    :param subscription: The subscription object that's to be provisioned.
    :type subscription: {class}`RouterProvisioning`
    :param process_id: The related process ID, used for callback.
    :type process_id: UUIDstr
    :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
    :type dry_run: bool
    :rtype: None
    parameters = {"dry_run": dry_run, "subscription": json.loads(json_dumps(subscription))}
    _send_request("router", parameters, process_id, CUDOperation.POST)
def provision_ip_trunk(
    subscription: IptrunkProvisioning, process_id: UUIDstr, config_object: str, dry_run: bool = True
) -> None:
    """Provision an IP trunk service using {term}`LSO`.
    :param subscription: The subscription object that's to be provisioned.
    :type subscription: {class}`IptrunkProvisioning`
    :param process_id: The related process ID, used for callback.
    :type process_id: UUIDstr
    :param config_object: The type of object that's deployed.
    :type config_object: str
    :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
    :type dry_run: bool
    :rtype: None
    parameters = {
        "subscription": json.loads(json_dumps(subscription)),
        "dry_run": dry_run,
        "verb": "deploy",
        "object": config_object,
    _send_request("ip_trunk", parameters, process_id, CUDOperation.POST)

def check_ip_trunk(subscription: IptrunkProvisioning, process_id: UUIDstr, check_name: str) -> None:
Simone Spinelli's avatar
Simone Spinelli committed
    """Provision an IP trunk service using {term}`LSO`.

    :param subscription: The subscription object that's to be provisioned.
    :type subscription: {class}`IptrunkProvisioning`
    :param process_id: The related process ID, used for callback.
    :type process_id: UUIDstr
    :param check_name: The name of the check to execute
    :rtype: None
    """
    parameters = {
        "subscription": json.loads(json_dumps(subscription)),
        "check_name": check_name,
    }

    _send_request("ip_trunk/perform_check", parameters, process_id, CUDOperation.POST)
def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, dry_run: bool = True) -> None:
    """Deprovision an IP trunk service using {term}`LSO`.
    :param subscription: The subscription object that's to be provisioned.
    :type subscription: {class}`IptrunkProvisioning`
    :param process_id: The related process ID, used for callback.
    :type process_id: UUIDstr
    :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
    :type dry_run: bool
    :rtype: None
    parameters = {"subscription": json.loads(json_dumps(subscription)), "dry_run": dry_run, "verb": "terminate"}
    _send_request("ip_trunk", parameters, process_id, CUDOperation.DELETE)
def migrate_ip_trunk(
    subscription: Iptrunk,
    new_node: Router,
    new_lag_interface: str,
    new_lag_member_interfaces: list[str],
    replace_index: int,
    process_id: UUIDstr,
    verb: str,
    dry_run: bool = True,
) -> None:
    """Migrate an IP trunk service using {term}`LSO`.

    :param subscription: The subscription object that's to be migrated.
    :type subscription: {class}`Iptrunk`
    :param new_node: The new node that is being migrated to
    :type new_node: {class}`Router`
    :param new_lag_interface: The name of the new aggregated Ethernet interface
    :type new_lag_interface: str
    :param new_lag_member_interfaces: The new list of interfaces that are part of the LAG
    :type new_lag_member_interfaces: list[str]
    :param replace_index: The index of the side that is going to be replaced as part of the existing trunk,
                          can be `0` or `1`.
    :type replace_index: int
    :param process_id: The related process ID, used for callback.
    :type process_id: UUIDstr
    :param verb: The verb that is passed to the executed playbook
    :type verb: str
    :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
    :type dry_run: bool
    :rtype: None
    """
    parameters = {
        "subscription": json.loads(json_dumps(subscription)),
        "new_side": {
            "new_node": json.loads(json_dumps(new_node)),
            "new_lag_interface": new_lag_interface,
            "new_lag_member_interfaces": new_lag_member_interfaces,
            "replace_index": replace_index,
        },
        "verb": verb,
        "dry_run": dry_run,
    }

    _send_request("ip_trunk/migrate", parameters, process_id, CUDOperation.POST)


@inputstep("Await provisioning proxy results", assignee=Assignee("SYSTEM"))
def _await_pp_results(subscription: SubscriptionModel, label_text: str = DEFAULT_LABEL) -> FormGenerator:
    """Input step that forces the workflow to go into a `SUSPENDED` state.

    When the workflow is `SUSPENDED`, it will wait for user input to be presented before it continues running the next
    steps of the workflow. User input is mimicked by the provisioning proxy, as it makes a `PUT` request to the callback
    URL that it was given in `_send_request()`. This input is fabricated in such a way that it will advance the workflow
    to the next step. This next step should always be `confirm_pp_results()`, where the operator is presented with the
    output of the provisioning proxy.

    :param subscription: The current subscription that the provisioning proxy is acting on.
    :type subscription: {class}`orchestrator.domain.SubscriptionModel`
    :param label_text: A label that's displayed to the operator when the provisioning proxy hasn't returned its
        results yet. Defaults to `DEFAULT_LABEL`.
    :type label_text: str
    :return: The input that's given by the provisioning proxy, that should contain run results, and a `confirm`
        boolean set to `True`.
    :rtype: {class}`orchestrator.types.FormGenerator`
    """
    class ProvisioningResultPage(FormPage):
        class Config:
            title = f"Deploying {subscription.product.name}..."
        warning_label: Label = label_text  # type: ignore
        pp_run_results: dict = None  # type: ignore
        confirm: Accept = Accept("INCOMPLETE")
        @validator("pp_run_results", allow_reuse=True, pre=True, always=True)
        def run_results_must_be_given(cls, run_results: dict) -> dict | NoReturn:
                raise ValueError("Run results may not be empty. Wait for the provisioning proxy to finish.")
    result_page = yield ProvisioningResultPage

    return result_page.dict()


def _reset_pp_success_state() -> State:
    """Reset the boolean that indicates a successful provisioning proxy result in the state of a running workflow.

    :return: A new state of the workflow, where the key `pp_did_succeed` has been (re)set to false.
    :rtype: {class}`orchestrator.types.State`
    """
@inputstep("Confirm provisioning proxy results", assignee=Assignee("SYSTEM"))
def _confirm_pp_results(state: State) -> FormGenerator:
    """Input step where a human has to confirm the result from calling provisioning proxy.

    The results of a call to the provisioning proxy are displayed, together with the fact whether this execution was
    a success or not. If unsuccessful, an extra label is displayed that warns the user about the fact that this
    execution will be retried. This will happen up to two times, after which the workflow will fail.

    :param state: The current state of the workflow.
    :type state: {class}`orchestrator.types.State`
    :return: Confirmation from the user, when presented with the run results.
    :rtype: {class}`orchestrator.types.FormGenerator`
    """
    if "pp_run_results" not in state:
Karel van Klink's avatar
Karel van Klink committed
        # FIXME: dirty hack that makes the skipping """work"""
        return {"pp_did_succeed": True}

    successful_run = state["pp_run_results"]["return_code"] == 0

    class ConfirmRunPage(FormPage):
        class Config:
            title = (
                f"Execution for {state['subscription']['product']['name']} completed, please confirm the results below."
            pp_retry_label1: Label = (
Simone Spinelli's avatar
Simone Spinelli committed
                "Provisioning Proxy - playbook execution failed: inspect the output before proceeding"  # type: ignore
            )
        run_status: str = ReadOnlyField(state["pp_run_results"]["status"])
        run_results: LongText = ReadOnlyField(json.dumps(state["pp_run_results"]["output"], indent=4))
Karel van Klink's avatar
Karel van Klink committed
            pp_retry_label: Label = (
                "Click submit to retry. Otherwise, abort the workflow from the process tab."  # type: ignore
Karel van Klink's avatar
Karel van Klink committed
            )
    yield ConfirmRunPage

    return {"pp_did_succeed": successful_run}


def pp_interaction(provisioning_step: Step, attempts: int, abort_on_failure: bool = True) -> StepList:
    """Interaction with the provisioning proxy.

    This method returns the three steps that make up an interaction with the provisioning proxy:
        - The provisioning step itself, given by the user as input.
        - The input step that suspends the workflow, and will wait for results from the provisioning proxy.
        - An input step that presents the user with the results, where they must be confirmed.

    All these steps are wrapped in a {class}`orchestrator.workflow.conditional`. This ensures that when provisioning was
    already successful, these steps are skipped. This mechanism is quite a dirty hack, and it's planned to be addressed
    The parameter `attempts` indicates how many times a provisioning may be attempted. When this amount is exceeded, and
    it's still not successful, the workflow will be aborted if so indicated with the `abort_on_failure` boolean.
    :param provisioning_step: The step that executes an interaction with the provisioning proxy.
    :type provisioning_step: {class}`orchestrator.workflow.Step`
    :param attempts: The maximum amount of times that a provisioning can be retried.
    :type attempts: int
    :param abort_on_failure: A boolean value that indicates whether a workflow should abort if the provisioning has
                             failed the maximum amount of tries. Defaults to `True`.
    :return: A list of three steps that form one interaction with the provisioning proxy.
    :rtype: {class}`orchestrator.workflow.StepList`
    """
    should_retry_pp_steps = conditional(lambda state: not state.get("pp_did_succeed"))
    pp_steps = StepList([_reset_pp_success_state])

    for _ in range(attempts):
        pp_steps >>= (
            should_retry_pp_steps(provisioning_step)
            >> should_retry_pp_steps(_await_pp_results)
            >> should_retry_pp_steps(_confirm_pp_results)
        )

    if abort_on_failure:
        # Abort a workflow if provisioning has failed too many times
        pp_steps >>= should_retry_pp_steps(abort)