diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8239575e9c63ec888c7a5acdfc212b98b21d84b5 --- /dev/null +++ b/test/workflows/__init__.py @@ -0,0 +1,286 @@ +import difflib +import pprint +from copy import deepcopy +from itertools import chain, repeat +from typing import Callable, Dict, List, Optional, Tuple, Union, cast +from uuid import uuid4 + +import structlog + +from orchestrator.db import ProcessTable +from orchestrator.forms import post_process +from orchestrator.services.processes import StateMerger, _db_create_process +from orchestrator.types import State +from orchestrator.utils.json import json_dumps, json_loads +from orchestrator.workflow import Process as WFProcess +from orchestrator.workflow import ProcessStat, Step, Success, Workflow, runwf +from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow +from orchestrator.types import FormGenerator, InputForm + +logger = structlog.get_logger(__name__) + + +def _raise_exception(state): + if isinstance(state, Exception): + raise state + return state + + +def assert_success(result): + assert ( + result.on_failed(_raise_exception).on_waiting(_raise_exception).issuccess() + ), f"Unexpected process status. Expected Success, but was: {result}" + + +def assert_waiting(result): + assert result.on_failed( + _raise_exception + ).iswaiting(), f"Unexpected process status. Expected Waiting, but was: {result}" + + +def assert_suspended(result): + assert result.on_failed( + _raise_exception + ).issuspend(), f"Unexpected process status. Expected Suspend, but was: {result}" + + +def assert_aborted(result): + assert result.on_failed(_raise_exception).isabort(), f"Unexpected process status. Expected Abort, but was: {result}" + + +def assert_failed(result): + assert result.isfailed(), f"Unexpected process status. Expected Failed, but was: {result}" + + +def assert_complete(result): + assert result.on_failed( + _raise_exception + ).iscomplete(), f"Unexpected process status. Expected Complete, but was: {result}" + + +def assert_state(result, expected): + state = result.unwrap() + actual = {} + for key in expected.keys(): + actual[key] = state[key] + assert expected == actual, f"Invalid state. Expected superset of: {expected}, but was: {actual}" + + +def assert_state_equal(result: ProcessTable, expected: Dict, excluded_keys: Optional[List[str]] = None) -> None: + """Test state with certain keys excluded from both actual and expected state.""" + if excluded_keys is None: + excluded_keys = ["process_id", "workflow_target", "workflow_name"] + state = deepcopy(extract_state(result)) + expected_state = deepcopy(expected) + for key in excluded_keys: + if key in state: + del state[key] + if key in expected_state: + del expected_state[key] + + assert state == expected_state, "Unexpected state:\n" + "\n".join( + difflib.ndiff(pprint.pformat(state).splitlines(), pprint.pformat(expected_state).splitlines()) + ) + + +def assert_assignee(log, expected): + actual = log[-1][0].assignee + assert expected == actual, f"Unexpected assignee. Expected {expected}, but was: {actual}" + + +def assert_step_name(log, expected): + actual = log[-1][0] + assert actual.name == expected, f"Unexpected name. Expected {expected}, but was: {actual}" + + +def extract_state(result): + return result.unwrap() + + +def extract_error(result): + from orchestrator.workflow import Process + + assert isinstance(result, Process), f"Expected a Process, but got {repr(result)} of type {type(result)}" + assert not isinstance(result.s, Process), "Result contained a Process in a Process, this should not happen" + + return extract_state(result).get("error") + + +class WorkflowInstanceForTests(LazyWorkflowInstance): + """Register Test workflows. + + Similar to `LazyWorkflowInstance` but does not require an import during instantiate + Used for creating test workflows + """ + + package: str + function: str + is_callable: bool + + def __init__(self, workflow: Workflow, name: str) -> None: + self.workflow = workflow + self.name = name + + def __enter__(self): + ALL_WORKFLOWS[self.name] = self + + def __exit__(self, _exc_type, _exc_value, _traceback): + del ALL_WORKFLOWS[self.name] + + def instantiate(self) -> Workflow: + """Import and instantiate a workflow and return it. + + This can be as simple as merely importing a workflow function. However, if it concerns a workflow generating + function, that function will be called with or without arguments as specified. + + Returns: + A workflow function. + + """ + self.workflow.name = self.name + return self.workflow + + def __str__(self) -> str: + return self.name + + def __repr__(self) -> str: + return f"WorkflowInstanceForTests('{self.workflow}','{self.name}')" + + +def _store_step(step_log: List[Tuple[Step, WFProcess]]) -> Callable[[ProcessStat, Step, WFProcess], WFProcess]: + def __store_step(pstat: ProcessStat, step: Step, state: WFProcess) -> WFProcess: + try: + state = state.map(lambda s: json_loads(json_dumps(s))) + except Exception: + logger.exception("Step state is not valid json", state=state) + step_log.append((step, state)) + return state + + return __store_step + + +def _sanitize_input(input_data: Union[State, List[State]]) -> List[State]: + # To be backwards compatible convert single dict to list + if not isinstance(input_data, List): + input_data = [input_data] + + # We need a copy here and we want to mimic the actual code that returns a serialized version of the state + return cast(List[State], json_loads(json_dumps(input_data))) + + +def run_workflow(workflow_key: str, input_data: Union[State, List[State]]) -> Tuple[WFProcess, ProcessStat, List]: + # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.start_process` + # The main differences are: we use a different step log function and we don't run in + # a sepperate thread + user_data = _sanitize_input(input_data) + user = "john.doe" + + step_log: List[Tuple[Step, WFProcess]] = [] + + pid = uuid4() + workflow = get_workflow(workflow_key) + assert workflow, "Workflow does not exist" + initial_state = { + "process_id": pid, + "reporter": user, + "workflow_name": workflow_key, + "workflow_target": workflow.target, + } + + user_input = post_process(workflow.initial_input_form, initial_state, user_data) + + pstat = ProcessStat( + pid, workflow=workflow, state=Success({**user_input, **initial_state}), log=workflow.steps, current_user=user + ) + + _db_create_process(pstat) + + result = runwf(pstat, _store_step(step_log)) + + return result, pstat, step_log + + +def resume_workflow( + process: ProcessStat, step_log: List[Tuple[Step, WFProcess]], input_data: State +) -> Tuple[WFProcess, List]: + # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.resume_process` + # The main differences are: we use a different step log function and we don't run in + # a sepperate thread + user_data = _sanitize_input(input_data) + + persistent = list(filter(lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting()), step_log)) + nr_of_steps_done = len(persistent) + remaining_steps = process.workflow.steps[nr_of_steps_done:] + + _, current_state = step_log[-1] + + user_input = post_process(remaining_steps[0].form, current_state.unwrap(), user_data) + state = current_state.map(lambda state: StateMerger.merge(deepcopy(state), user_input)) + + updated_process = process.update(log=remaining_steps, state=state) + result = runwf(updated_process, _store_step(step_log)) + return result, step_log + + +def run_form_generator( + form_generator: FormGenerator, extra_inputs: Optional[List[State]] = None +) -> Tuple[List[dict], State]: + """Run a form generator to get the resulting forms and result. + + Warning! This does not run the actual pydantic validation on purpose. However you should + make sure that anything in extra_inputs matched the values and types as if the pydantic validation has + been ran. + + Args: + form_generator: A form generator + extra_inputs: Optional list of user input dicts for each page in the generator. + If no input is given for a page an empty dict is used. + The default value from the form is used as default value for a field. + + Returns: + A list of generated forms and the result state for the whole generator. + + Example: + Given the following form generator: + + >>> from pydantic_forms.core import FormPage + >>> def form_generator(state): + ... class TestForm(FormPage): + ... field: str = "foo" + ... user_input = yield TestForm + ... return {**user_input.dict(), "bar": 42} + + You can run this without extra_inputs + >>> forms, result = run_form_generator(form_generator({"state_field": 1})) + >>> forms + [{'title': 'unknown', 'type': 'object', 'properties': {'field': {'title': 'Field', 'default': 'foo', 'type': 'string'}}, 'additionalProperties': False}] + >>> result + {'field': 'foo', 'bar': 42} + + + Or with extra_inputs: + >>> forms, result = run_form_generator(form_generator({'state_field': 1}), [{'field':'baz'}]) + >>> forms + [{'title': 'unknown', 'type': 'object', 'properties': {'field': {'title': 'Field', 'default': 'foo', 'type': 'string'}}, 'additionalProperties': False}] + >>> result + {'field': 'baz', 'bar': 42} + + """ + forms: List[dict] = [] + result: State = {"s": 3} + if extra_inputs is None: + extra_inputs = [] + + try: + form = cast(InputForm, next(form_generator)) + forms.append(form.schema()) + for extra_input in chain(extra_inputs, repeat(cast(State, {}))): + user_input_data = {field_name: field.default for field_name, field in form.__fields__.items()} + user_input_data.update(extra_input) + user_input = form.construct(**user_input_data) + form = form_generator.send(user_input) + forms.append(form.schema()) + except StopIteration as stop: + result = stop.value + + return forms, result diff --git a/test/workflows/conftest.py b/test/workflows/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..c548aec6d49a4f07076389a0b4d665bf348b1c49 --- /dev/null +++ b/test/workflows/conftest.py @@ -0,0 +1,25 @@ +import pytest +from urllib3_mock import Responses + + +@pytest.fixture(autouse=True) +def responses(): + responses_mock = Responses("requests.packages.urllib3") + + def _find_request(call): + mock_url = responses_mock._find_match(call.request) + if not mock_url: + pytest.fail(f"Call not mocked: {call.request}") + return mock_url + + def _to_tuple(url_mock): + return (url_mock["url"], url_mock["method"], url_mock["match_querystring"]) + + with responses_mock: + yield responses_mock + + mocked_urls = map(_to_tuple, responses_mock._urls) + used_urls = map(_to_tuple, map(_find_request, responses_mock.calls)) + not_used = set(mocked_urls) - set(used_urls) + if not_used: + pytest.fail(f"Found unused responses mocks: {not_used}", pytrace=False) diff --git a/test/workflows/site/__init__.py b/test/workflows/site/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/workflows/site/test_create_site.py b/test/workflows/site/test_create_site.py new file mode 100644 index 0000000000000000000000000000000000000000..826fcdacf52bbe17fad82309f01fe6700fc6af3e --- /dev/null +++ b/test/workflows/site/test_create_site.py @@ -0,0 +1,47 @@ +import pytest +from orchestrator.db import ProductTable + +from gso.products.product_blocks.site import SiteTier +from gso.products.product_types.site import Site +from gso.services.crm import get_customer_by_name +from test.workflows import ( + assert_complete, + assert_failed, + extract_error, + extract_state, + run_workflow, + assert_suspended, + resume_workflow, +) + + +@pytest.mark.workflow +def test_create_site(responses, faker): + product_id = ProductTable.query.filter(ProductTable.name == "Site").one().product_id + initial_site_data = [ + {"product": product_id}, + { + "site_name": faker.name(), + "site_city": faker.city(), + "site_country": faker.country(), + "site_country_code": faker.country_code(), + "site_latitude": 1, + "site_longitude": 2, + "site_bgp_community_id": faker.pyint(), + "site_internal_id": faker.pyint(), + "site_tier": SiteTier.TIER1, + "site_ts_address": faker.ipv4(), + "customer": get_customer_by_name("GÉANT")["id"], + }, + ] + result, process, step_log = run_workflow("create_site", initial_site_data) + assert_complete(result) + + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = Site.from_subscription(subscription_id) + assert "active" == subscription.status + assert ( + subscription.description + == f"Site in {initial_site_data[1]['site_city']}, {initial_site_data[1]['site_country']}" + )