diff --git a/docs/source/module/workflows/index.rst b/docs/source/module/workflows/index.rst index d3f0d0a2eff42a8f3693d7c04f83e05ad2bdf6c1..3ee41bad0534418b8417ef5cd037795a9681628d 100644 --- a/docs/source/module/workflows/index.rst +++ b/docs/source/module/workflows/index.rst @@ -14,7 +14,8 @@ Subpackages iptrunk/index office_router/index + opengear/index router/index site/index super_pop_switch/index - opengear/index + tasks/index diff --git a/docs/source/module/workflows/tasks/index.rst b/docs/source/module/workflows/tasks/index.rst new file mode 100644 index 0000000000000000000000000000000000000000..640f1184ec743d2bab5f0b183e85103ca8a6b8e1 --- /dev/null +++ b/docs/source/module/workflows/tasks/index.rst @@ -0,0 +1,15 @@ +``gso.workflows.tasks`` +======================= + +.. automodule:: gso.workflows.tasks + :members: + :show-inheritance: + +Submodules +---------- + +.. toctree:: + :maxdepth: 2 + :titlesonly: + + validate_geant_products diff --git a/docs/source/module/workflows/tasks/validate_geant_products.rst b/docs/source/module/workflows/tasks/validate_geant_products.rst new file mode 100644 index 0000000000000000000000000000000000000000..540b73460053cae0aad8fdb9a3b3ba7b68dc719c --- /dev/null +++ b/docs/source/module/workflows/tasks/validate_geant_products.rst @@ -0,0 +1,6 @@ +``gso.workflows.tasks.validate_geant_products`` +=============================================== + +.. automodule:: gso.workflows.tasks.validate_geant_products + :members: + :show-inheritance: diff --git a/gso/workflows/tasks/validate_geant_products.py b/gso/workflows/tasks/validate_geant_products.py index 8bad9d6dc88f6dbe3474e161b7d514964af9827a..cb5ac3c83108e6f4e68e59d13e76027b576871de 100644 --- a/gso/workflows/tasks/validate_geant_products.py +++ b/gso/workflows/tasks/validate_geant_products.py @@ -14,178 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any - -import orchestrator.workflows -from more_itertools.more import one -from more_itertools.recipes import first_true -from orchestrator.db import FixedInputTable, ProductTable, SubscriptionTable, db -from orchestrator.domain.base import SubscriptionModel -from orchestrator.services import products -from orchestrator.services.products import get_products -from orchestrator.services.translations import generate_translations -from orchestrator.services.workflows import get_workflow_by_name, get_workflows from orchestrator.targets import Target -from orchestrator.types import State -from orchestrator.utils.errors import ProcessFailureError -from orchestrator.utils.fixed_inputs import fixed_input_configuration as fi_configuration -from orchestrator.workflow import StepList, done, init, step, workflow -from pydantic import ValidationError -from sqlalchemy import not_, select -from sqlalchemy.orm import joinedload - -# Since these errors are probably programming failures we should not throw AssertionErrors - - -@step("Check all workflows in database") -def check_all_workflows_are_in_db() -> State: - """Validate that all workflows exist in the database.""" - all_workflows_in_db = {k.name for k in get_workflows()} - all_workflows = {k for k in orchestrator.workflows.ALL_WORKFLOWS.keys()} # noqa: C416, SIM118 - not_in_db = all_workflows - all_workflows_in_db - not_in_lwi = all_workflows_in_db - all_workflows - if not_in_db or not_in_lwi: - msg = "Found missing workflows in database or implementations" - raise ProcessFailureError( - msg, - { - "Workflows not registered in the database": list(not_in_db), - "Workflows not registered in a `LazyWorkflowInstance`": list(not_in_lwi), - }, - ) - - return {"check_all_workflows_are_in_db": True} - - -@step("Check workflows for matching targets and descriptions") -def check_workflows_for_matching_targets_and_descriptions() -> State: - """Validate that all workflows' targets and descriptions match up.""" - workflow_assertions = [] - for key, lazy_wf in orchestrator.workflows.ALL_WORKFLOWS.items(): - wf = lazy_wf.instantiate() - db_workflow = get_workflow_by_name(key) - # Test workflows might not exist in the database - if db_workflow and ( - wf.target != db_workflow.target or wf.name != db_workflow.name or wf.description != db_workflow.description - ): - message = ( - f"Workflow {wf.name}: {wf.target} <=> {db_workflow.target}, " - f"{wf.name} <=> {db_workflow.name} and {wf.description} <=> {db_workflow.description}. " - ) - workflow_assertions.append(message) - - if workflow_assertions: - workflow_message = "\n".join(workflow_assertions) - msg = "Workflows with none matching targets and descriptions" - raise ProcessFailureError(msg, workflow_message) - - # Check translations - translations = generate_translations("en-GB")["workflow"] - workflow_assertions = [] - for key in orchestrator.workflows.ALL_WORKFLOWS: - if key not in translations: - workflow_assertions.append(key) - - if workflow_assertions: - workflow_message = "\n".join(workflow_assertions) - msg = "Workflows with missing translations" - raise ProcessFailureError(msg, workflow_message) - - return {"check_workflows_for_matching_targets_and_descriptions": True} - - -@step("Check that all products have at least one workflow") -def check_that_products_have_at_least_one_workflow() -> State: - """All products must have at least one workflow associated to it.""" - stmt = select(ProductTable).filter(not_(ProductTable.workflows.any())).with_only_columns(ProductTable.name) - prods_without_wf = db.session.scalars(stmt).all() - if prods_without_wf: - msg = "Found products that do not have a workflow associated with them" - raise ProcessFailureError(msg, prods_without_wf) - - return {"check_that_products_have_at_least_one_workflow": True} - - -@step("Check that all products have a create, modify, terminate and validate workflow") -def check_that_products_have_create_modify_and_terminate_workflows() -> State: - """All products must have a workflow for every workflow target that exists.""" - workflow_targets = ["CREATE", "TERMINATE", "MODIFY", "SYSTEM"] - product_data = get_products(filters=[ProductTable.status == "active"]) - - workflows_not_complete: list = [] - for product in product_data: - workflows = {c.target for c in product.workflows if c.target in workflow_targets and c.name != "modify_note"} - if len(workflows) < len(workflow_targets): - workflows_not_complete.append(product.name) - - # Do not raise an error but only report it in the `State` to allow exceptions. - return { - "products_without_at_least_create_modify_terminate_validate_workflows": workflows_not_complete, - "check_that_products_have_create_modify_and_terminate_workflows": True, - } - - -@step("Check the DB fixed input config") -def check_db_fixed_input_config() -> State: - """Validate the configuration of fixed inputs in the database.""" - fixed_input_configuration = fi_configuration() - product_tags = products.get_tags() - stmt = select(FixedInputTable).options(joinedload(FixedInputTable.product)) - fixed_inputs = db.session.scalars(stmt) - - data: dict = {"fixed_inputs": [], "by_tag": {}} - errors: list = [] - - for tag in product_tags: - data["by_tag"][tag] = [] - for fi in fixed_inputs: - fi_data: dict = first_true( - fixed_input_configuration["fixed_inputs"], - {}, - lambda i: i["name"] == fi.name, # noqa: B023 - ) - if not fi_data: - errors.append(fi) - - if fi.value not in fi_data["values"]: - errors.append(fi) - - tag_data = {one(fi) for fi in fixed_input_configuration["by_tag"][fi.product.tag]} - tag_data_required = {one(fi) for fi in fixed_input_configuration["by_tag"][fi.product.tag] if fi[one(fi)]} - - if not tag_data: - errors.append(fi) - - if {fi.name for fi in fi.product.fixed_inputs} - set(tag_data): - errors.append(fi.product.name) - if set(tag_data_required) - {fi.name for fi in fi.product.fixed_inputs}: - errors.append(fi.product.name) - - if errors: - msg = "Errors in fixed input config" - raise ProcessFailureError(msg, errors) - - return {"check_db_fixed_input_config": True} - - -@step("Check subscription models") -def check_subscription_models() -> State: - """Try and load all subscription models, to see whether any fail to load.""" - subscriptions = db.session.scalars(select(SubscriptionTable)) - failures: dict[str, Any] = {} - for subscription in subscriptions: - try: - SubscriptionModel.from_subscription(subscription.subscription_id) - except ValidationError as e: - failures[str(subscription.subscription_id)] = e.errors() - except Exception as e: # noqa: BLE001 - failures[str(subscription.subscription_id)] = str(e) - - if failures: - msg = "Found subscriptions that could not be loaded" - raise ProcessFailureError(msg, failures) - - return {"check_subscription_models": True} +from orchestrator.workflow import StepList, done, init, workflow +from orchestrator.workflows.tasks.validate_products import ( + check_all_workflows_are_in_db, + check_db_fixed_input_config, + check_subscription_models, + check_that_products_have_at_least_one_workflow, + check_that_products_have_create_modify_and_terminate_workflows, + check_workflows_for_matching_targets_and_descriptions, +) @workflow("Validate GEANT products", target=Target.SYSTEM)