Skip to content
Snippets Groups Projects
Commit 5d0e303e authored by Karel van Klink's avatar Karel van Klink :smiley_cat: Committed by Neda Moeini
Browse files

Split off task_validate_products

The built-in task_validate_products is now replaced by task_validate_geant_products such that it removes the step that requires us to add modify_note to all products. This workflow is not used by us, and therefore doesn't need to be tested for.
parent 3036855f
No related branches found
No related tags found
1 merge request!139Feature/add validation workflows
"""Add task_validate_geant_products.
Revision ID: 31fd1ae8d5bb
Revises: 3323bcb934e7
Create Date: 2024-07-15 15:34:00.00000
"""
from uuid import uuid4
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "31fd1ae8d5bb"
down_revision = "b5dfbc1ec7b2"
branch_labels = None
depends_on = None
workflows = [
{"name": "task_validate_geant_products", "description": "Validate GEANT products", "workflow_id": uuid4(), "target": "SYSTEM"},
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(
sa.text(
"INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING"
),
workflow,
)
def downgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]})
...@@ -12,4 +12,4 @@ from gso.worker import celery ...@@ -12,4 +12,4 @@ from gso.worker import celery
def validate_products() -> None: def validate_products() -> None:
"""Validate all products.""" """Validate all products."""
if count_incomplete_validate_products() > 0: if count_incomplete_validate_products() > 0:
start_process("task_validate_products") start_process("task_validate_geant_products")
...@@ -66,6 +66,7 @@ ...@@ -66,6 +66,7 @@
"import_super_pop_switch": "NOT FOR HUMANS -- Finalize import into a Super PoP switch", "import_super_pop_switch": "NOT FOR HUMANS -- Finalize import into a Super PoP switch",
"import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear", "import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear",
"validate_iptrunk": "Validate IP Trunk configuration", "validate_iptrunk": "Validate IP Trunk configuration",
"validate_router": "Validate router configuration" "validate_router": "Validate router configuration",
"task_validate_geant_products": "Validation task for GEANT products"
} }
} }
...@@ -65,3 +65,6 @@ LazyWorkflowInstance("gso.workflows.office_router.create_imported_office_router" ...@@ -65,3 +65,6 @@ LazyWorkflowInstance("gso.workflows.office_router.create_imported_office_router"
# Opengear workflows # Opengear workflows
LazyWorkflowInstance("gso.workflows.opengear.create_imported_opengear", "create_imported_opengear") LazyWorkflowInstance("gso.workflows.opengear.create_imported_opengear", "create_imported_opengear")
LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear") LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear")
# Tasks
LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products")
"""Task workflows."""
"""A task that checks for all products in the database to be well-kept."""
# Copyright 2019-2020 SURF.
# Copyright 2024 GÉANT Vereniging.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
import orchestrator.workflows
from more_itertools.more import one
from more_itertools.recipes import first_true
from orchestrator.db import FixedInputTable, ProductTable, SubscriptionTable, db
from orchestrator.domain.base import SubscriptionModel
from orchestrator.services import products
from orchestrator.services.products import get_products
from orchestrator.services.translations import generate_translations
from orchestrator.services.workflows import get_workflow_by_name, get_workflows
from orchestrator.targets import Target
from orchestrator.types import State
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.fixed_inputs import fixed_input_configuration as fi_configuration
from orchestrator.workflow import StepList, done, init, step, workflow
from pydantic import ValidationError
from sqlalchemy import not_, select
from sqlalchemy.orm import joinedload
# Since these errors are probably programming failures we should not throw AssertionErrors
@step("Check all workflows in database")
def check_all_workflows_are_in_db() -> State:
"""Validate that all workflows exist in the database."""
all_workflows_in_db = {k.name for k in get_workflows()}
all_workflows = {k for k in orchestrator.workflows.ALL_WORKFLOWS.keys()} # noqa: C416, SIM118
not_in_db = all_workflows - all_workflows_in_db
not_in_lwi = all_workflows_in_db - all_workflows
if not_in_db or not_in_lwi:
msg = "Found missing workflows in database or implementations"
raise ProcessFailureError(
msg,
{
"Workflows not registered in the database": list(not_in_db),
"Workflows not registered in a `LazyWorkflowInstance`": list(not_in_lwi),
},
)
return {"check_all_workflows_are_in_db": True}
@step("Check workflows for matching targets and descriptions")
def check_workflows_for_matching_targets_and_descriptions() -> State:
"""Validate that all workflows' targets and descriptions match up."""
workflow_assertions = []
for key, lazy_wf in orchestrator.workflows.ALL_WORKFLOWS.items():
wf = lazy_wf.instantiate()
db_workflow = get_workflow_by_name(key)
# Test workflows might not exist in the database
if db_workflow and (
wf.target != db_workflow.target or wf.name != db_workflow.name or wf.description != db_workflow.description
):
message = (
f"Workflow {wf.name}: {wf.target} <=> {db_workflow.target}, "
f"{wf.name} <=> {db_workflow.name} and {wf.description} <=> {db_workflow.description}. "
)
workflow_assertions.append(message)
if workflow_assertions:
workflow_message = "\n".join(workflow_assertions)
msg = "Workflows with none matching targets and descriptions"
raise ProcessFailureError(msg, workflow_message)
# Check translations
translations = generate_translations("en-GB")["workflow"]
workflow_assertions = []
for key in orchestrator.workflows.ALL_WORKFLOWS:
if key not in translations:
workflow_assertions.append(key)
if workflow_assertions:
workflow_message = "\n".join(workflow_assertions)
msg = "Workflows with missing translations"
raise ProcessFailureError(msg, workflow_message)
return {"check_workflows_for_matching_targets_and_descriptions": True}
@step("Check that all products have at least one workflow")
def check_that_products_have_at_least_one_workflow() -> State:
"""All products must have at least one workflow associated to it."""
stmt = select(ProductTable).filter(not_(ProductTable.workflows.any())).with_only_columns(ProductTable.name)
prods_without_wf = db.session.scalars(stmt).all()
if prods_without_wf:
msg = "Found products that do not have a workflow associated with them"
raise ProcessFailureError(msg, prods_without_wf)
return {"check_that_products_have_at_least_one_workflow": True}
@step("Check that all products have a create, modify, terminate and validate workflow")
def check_that_products_have_create_modify_and_terminate_workflows() -> State:
"""All products must have a workflow for every workflow target that exists."""
workflow_targets = ["CREATE", "TERMINATE", "MODIFY", "SYSTEM"]
product_data = get_products(filters=[ProductTable.status == "active"])
workflows_not_complete: list = []
for product in product_data:
workflows = {c.target for c in product.workflows if c.target in workflow_targets and c.name != "modify_note"}
if len(workflows) < len(workflow_targets):
workflows_not_complete.append(product.name)
# Do not raise an error but only report it in the `State` to allow exceptions.
return {
"products_without_at_least_create_modify_terminate_validate_workflows": workflows_not_complete,
"check_that_products_have_create_modify_and_terminate_workflows": True,
}
@step("Check the DB fixed input config")
def check_db_fixed_input_config() -> State:
"""Validate the configuration of fixed inputs in the database."""
fixed_input_configuration = fi_configuration()
product_tags = products.get_tags()
stmt = select(FixedInputTable).options(joinedload(FixedInputTable.product))
fixed_inputs = db.session.scalars(stmt)
data: dict = {"fixed_inputs": [], "by_tag": {}}
errors: list = []
for tag in product_tags:
data["by_tag"][tag] = []
for fi in fixed_inputs:
fi_data: dict = first_true(
fixed_input_configuration["fixed_inputs"],
{},
lambda i: i["name"] == fi.name, # noqa: B023
)
if not fi_data:
errors.append(fi)
if fi.value not in fi_data["values"]:
errors.append(fi)
tag_data = {one(fi) for fi in fixed_input_configuration["by_tag"][fi.product.tag]}
tag_data_required = {one(fi) for fi in fixed_input_configuration["by_tag"][fi.product.tag] if fi[one(fi)]}
if not tag_data:
errors.append(fi)
if {fi.name for fi in fi.product.fixed_inputs} - set(tag_data):
errors.append(fi.product.name)
if set(tag_data_required) - {fi.name for fi in fi.product.fixed_inputs}:
errors.append(fi.product.name)
if errors:
msg = "Errors in fixed input config"
raise ProcessFailureError(msg, errors)
return {"check_db_fixed_input_config": True}
@step("Check subscription models")
def check_subscription_models() -> State:
"""Try and load all subscription models, to see whether any fail to load."""
subscriptions = db.session.scalars(select(SubscriptionTable))
failures: dict[str, Any] = {}
for subscription in subscriptions:
try:
SubscriptionModel.from_subscription(subscription.subscription_id)
except ValidationError as e:
failures[str(subscription.subscription_id)] = e.errors()
except Exception as e: # noqa: BLE001
failures[str(subscription.subscription_id)] = str(e)
if failures:
msg = "Found subscriptions that could not be loaded"
raise ProcessFailureError(msg, failures)
return {"check_subscription_models": True}
@workflow("Validate GEANT products", target=Target.SYSTEM)
def task_validate_geant_products() -> StepList:
"""Validate products in the database.
This task is based on the ``task_validate_products`` present in ``orchestrator-core`` but it does not check for the
existence of the ``modify_note`` workflow on all products, since this workflow is currently not used in GEANT.
"""
return (
init
>> check_all_workflows_are_in_db
>> check_workflows_for_matching_targets_and_descriptions
>> check_that_products_have_at_least_one_workflow
>> check_db_fixed_input_config
>> check_that_products_have_create_modify_and_terminate_workflows
>> check_subscription_models
>> done
)
...@@ -14,8 +14,8 @@ def validate_subscriptions(): ...@@ -14,8 +14,8 @@ def validate_subscriptions():
@pytest.fixture() @pytest.fixture()
def mock_get_insync_subscriptions(): def mock_get_active_insync_subscriptions():
with patch("gso.schedules.validate_subscriptions.get_insync_subscriptions") as mock: with patch("gso.schedules.validate_subscriptions.get_active_insync_subscriptions") as mock:
yield mock yield mock
...@@ -82,24 +82,24 @@ def test_scheduled_task_still_works(): ...@@ -82,24 +82,24 @@ def test_scheduled_task_still_works():
assert result == "task result" assert result == "task result"
def test_no_subscriptions(mock_get_insync_subscriptions, mock_logger, validate_subscriptions): def test_no_subscriptions(mock_get_active_insync_subscriptions, mock_logger, validate_subscriptions):
mock_get_insync_subscriptions.return_value = [] mock_get_active_insync_subscriptions.return_value = []
validate_subscriptions() validate_subscriptions()
mock_logger.info.assert_called_once_with("No subscriptions to validate") mock_logger.info.assert_called_once_with("No subscriptions to validate")
def test_subscriptions_without_system_target_workflow( def test_subscriptions_without_system_target_workflow(
mock_get_insync_subscriptions, mock_get_active_insync_subscriptions,
mock_logger, mock_logger,
validate_subscriptions, validate_subscriptions,
): ):
mock_get_insync_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))] mock_get_active_insync_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))]
validate_subscriptions() validate_subscriptions()
mock_logger.warning.assert_called_once() mock_logger.warning.assert_called_once()
def test_subscription_status_not_usable( def test_subscription_status_not_usable(
mock_get_insync_subscriptions, mock_get_active_insync_subscriptions,
mock_get_execution_context, mock_get_execution_context,
validate_subscriptions, validate_subscriptions,
): ):
...@@ -107,7 +107,7 @@ def test_subscription_status_not_usable( ...@@ -107,7 +107,7 @@ def test_subscription_status_not_usable(
subscription_mock.product.workflows = [MagicMock(target=Target.SYSTEM, name="workflow_name")] subscription_mock.product.workflows = [MagicMock(target=Target.SYSTEM, name="workflow_name")]
subscription_mock.status = "Not Usable Status" subscription_mock.status = "Not Usable Status"
mock_get_insync_subscriptions.return_value = [subscription_mock] mock_get_active_insync_subscriptions.return_value = [subscription_mock]
validate_subscriptions() validate_subscriptions()
validate_func = mock_get_execution_context()["validate"] validate_func = mock_get_execution_context()["validate"]
...@@ -115,7 +115,7 @@ def test_subscription_status_not_usable( ...@@ -115,7 +115,7 @@ def test_subscription_status_not_usable(
def test_valid_subscriptions_for_validation( def test_valid_subscriptions_for_validation(
mock_get_insync_subscriptions, mock_get_active_insync_subscriptions,
mock_get_execution_context, mock_get_execution_context,
validate_subscriptions, validate_subscriptions,
): ):
...@@ -123,7 +123,7 @@ def test_valid_subscriptions_for_validation( ...@@ -123,7 +123,7 @@ def test_valid_subscriptions_for_validation(
mocked_workflow = MagicMock(target=Target.SYSTEM, name="workflow_name") mocked_workflow = MagicMock(target=Target.SYSTEM, name="workflow_name")
subscription_mock.product.workflows = [mocked_workflow] subscription_mock.product.workflows = [mocked_workflow]
subscription_mock.status = "active" subscription_mock.status = "active"
mock_get_insync_subscriptions.return_value = [subscription_mock] mock_get_active_insync_subscriptions.return_value = [subscription_mock]
validate_subscriptions() validate_subscriptions()
validate_func = mock_get_execution_context()["validate"] validate_func = mock_get_execution_context()["validate"]
validate_func.assert_called_once_with( validate_func.assert_called_once_with(
......
...@@ -15,7 +15,7 @@ from test.workflows import ( ...@@ -15,7 +15,7 @@ from test.workflows import (
@pytest.mark.workflow() @pytest.mark.workflow()
@patch("gso.workflows.iptrunk.terminate_iptrunk.execute_playbook") @patch("gso.workflows.iptrunk.terminate_iptrunk.execute_playbook")
@patch("gso.utils.workflow_steps.execute_playbook") @patch("gso.utils.workflow_steps.lso_client.execute_playbook")
@patch("gso.workflows.iptrunk.terminate_iptrunk.infoblox.delete_network") @patch("gso.workflows.iptrunk.terminate_iptrunk.infoblox.delete_network")
@patch("gso.services.netbox_client.NetboxClient.delete_interface") @patch("gso.services.netbox_client.NetboxClient.delete_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface") @patch("gso.services.netbox_client.NetboxClient.free_interface")
......
...@@ -36,7 +36,7 @@ def router_creation_input_form_data(site_subscription_factory, faker): ...@@ -36,7 +36,7 @@ def router_creation_input_form_data(site_subscription_factory, faker):
@pytest.mark.workflow() @pytest.mark.workflow()
@patch("gso.utils.workflow_steps.execute_playbook") @patch("gso.utils.workflow_steps.lso_client.execute_playbook")
@patch("gso.workflows.router.create_router.NetboxClient.create_device") @patch("gso.workflows.router.create_router.NetboxClient.create_device")
@patch("gso.workflows.router.create_router.infoblox.hostname_available") @patch("gso.workflows.router.create_router.infoblox.hostname_available")
@patch("gso.workflows.router.create_router.infoblox.find_host_by_fqdn") @patch("gso.workflows.router.create_router.infoblox.find_host_by_fqdn")
...@@ -117,7 +117,7 @@ def test_create_nokia_router_success( ...@@ -117,7 +117,7 @@ def test_create_nokia_router_success(
@pytest.mark.workflow() @pytest.mark.workflow()
@patch("gso.utils.workflow_steps.execute_playbook") @patch("gso.utils.workflow_steps.lso_client.execute_playbook")
@patch("gso.workflows.router.create_router.NetboxClient.create_device") @patch("gso.workflows.router.create_router.NetboxClient.create_device")
@patch("gso.workflows.router.create_router.infoblox.hostname_available") @patch("gso.workflows.router.create_router.infoblox.hostname_available")
@patch("gso.workflows.router.create_router.infoblox.find_network_by_cidr") @patch("gso.workflows.router.create_router.infoblox.find_network_by_cidr")
......
...@@ -4,8 +4,8 @@ from test.workflows import assert_complete, extract_state, run_workflow ...@@ -4,8 +4,8 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow() @pytest.mark.workflow()
def test_task_validate_products(responses, faker): def test_task_validate_geant_products(responses, faker):
result, _, _ = run_workflow("task_validate_products", [{}]) result, _, _ = run_workflow("task_validate_geant_products", [{}])
assert_complete(result) assert_complete(result)
state = extract_state(result) state = extract_state(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment