import json import logging import requests from orchestrator import inputstep from orchestrator.config.assignee import Assignee # noinspection PyProtectedMember from orchestrator.forms import FormPage, ReadOnlyField from orchestrator.forms.validators import Accept, Label, LongText from orchestrator.types import UUIDstr, State, strEnum from orchestrator.utils.json import json_dumps from gso import settings from gso.products.product_types.device import DeviceProvisioning from gso.products.product_types.iptrunk import IptrunkProvisioning, Iptrunk logger = logging.getLogger(__name__) class CUDOperation(strEnum): """ Enum for different C(R)UD operations that the provisioning proxy supports. Read is not applicable, hence these become CUD and not CRUD operations. """ #: Creation is done with a POST request POST = 'POST' #: Updating is done with a PUT request PUT = 'PUT' #: Removal is done with a DELETE request DELETE = 'DELETE' def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation): """ Internal function for sending a request to LSO. The callback address is derived using the process ID provided. :param str endpoint: The LSO-specific endpoint to call, depending on the type of service object that is acted upon. :param dict parameters: JSON body for the request, which will almost always at least consist of a subscription object, and a boolean value to indicate a dry run. :param UUIDstr process_id: The process ID that this request is a part of, used to call back to when the execution of the playbook is completed. :param :class:`CUDOperation` operation: The specific operation that is performed with the request. """ oss = settings.load_oss_params() pp_params = oss.PROVISIONING_PROXY assert pp_params callback_url = f'{settings.load_oss_params().GENERAL.public_hostname}' \ f'/api/processes/{process_id}/resume' logger.debug(f'[provisioning proxy] provisioning for process {process_id}') parameters.update({'callback': callback_url}) url = f'{pp_params.scheme}://{pp_params.api_base}/api/{endpoint}' request = None if operation == CUDOperation.POST: request = requests.post(url, json=parameters) elif operation == CUDOperation.PUT: request = requests.put(url, json=parameters) elif operation == CUDOperation.DELETE: request = requests.delete(url, json=parameters) if request.status_code != 200: raise AssertionError(request.text) def provision_device( subscription: DeviceProvisioning, process_id: UUIDstr, dry_run: bool = True): """ Function that provisions a new device using LSO. :param :class:`DeviceProvisioning` subscription: The subscription object that is to be provisioned. :param UUIDstr process_id: The related process ID, used for callback. :param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``. """ parameters = { 'dry_run': dry_run, 'subscription': json.loads(json_dumps(subscription)) } _send_request('device', parameters, process_id, CUDOperation.POST) def provision_ip_trunk(subscription: IptrunkProvisioning, process_id: UUIDstr, config_object: str, dry_run: bool = True): """ Function that provisions an IP trunk service using LSO. :param :class:`IptrunkProvisioning` subscription: The subscription object that is to be provisioned. :param UUIDstr process_id: The related process ID, used for callback. :param str config_object: The type of object that is deployed :param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``. """ parameters = { 'subscription': json.loads(json_dumps(subscription)), 'dry_run': dry_run, 'verb': "deploy", 'object': config_object } _send_request('ip_trunk', parameters, process_id, CUDOperation.POST) # def modify_ip_trunk(old_subscription: Iptrunk, # new_subscription: Iptrunk, # process_id: UUIDstr, # dry_run: bool = True): # """ # Function that modifies an existing IP trunk subscription using LSO. # # :param :class:`Iptrunk` old_subscription: The subscription object, before # its modification. # :param :class:`Iptrunk` new_subscription: The subscription object, after # modifications have been made to it. # :param UUIDstr process_id: The related process ID, used for callback. # :param bool dry_run: A boolean indicating whether this should be a dry ryn # or not, defaults to ``True``. # """ # parameters = { # 'dry_run': dry_run, # 'old_subscription': old_subscription, # 'subscription': new_subscription # # FIXME missing parameters # } # # _send_request('ip_trunk', parameters, process_id, CUDOperation.PUT) def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, dry_run: bool = True): """ Function that provisions an IP trunk service using LSO. :param :class:`IptrunkProvisioning` subscription: The subscription object that is to be provisioned. :param UUIDstr process_id: The related process ID, used for callback. :param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``. """ parameters = { 'subscription': json.loads(json_dumps(subscription)), 'dry_run': dry_run, 'verb': "remove" } _send_request('ip_trunk', parameters, process_id, CUDOperation.DELETE) @inputstep('Await provisioning proxy results', assignee=Assignee('SYSTEM')) def await_pp_results() -> State: class ProvisioningResultPage(FormPage): class Config: title = 'Do NOT click on confirm in this step!' warning_label: Label = 'This step relies on an external service to ' \ 'send an update to the orchestrator, do not ' \ 'interfere with this process please.' pp_run_results: dict = {'state': 'not_ready'} confirm: Accept = Accept('INCOMPLETE') result_page = yield ProvisioningResultPage return result_page.dict() @inputstep('Confirm provisioning proxy results', assignee=Assignee('SYSTEM')) def confirm_pp_results(state: State) -> State: class ConfirmRunPage(FormPage): class Config: title = 'Execution completed, please confirm the results.' run_status: str = ReadOnlyField(state['pp_run_results']['status']) run_results: LongText = ReadOnlyField( f"{state['pp_run_results']['output']}") confirm: Accept = Accept('INCOMPLETE') yield ConfirmRunPage return state