diff --git a/Changelog.md b/Changelog.md index 5bc64facb7044708a17603795998fe50cf98face..e88b0515ae3bf8c12debdf90aa836c8e1bb8a8fc 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,9 @@ # Changelog +# [3.10] - 2025-06-17 +- Added parallelized massive base config redeploy +- Added and integrated R&E Peer and R&E LHCONE workflows, product types, and related migrations. + # [3.9] - 2025-06-05 - Add threshold input for BGP routes in L3 Core migration workflows and forward the value to Moodi. diff --git a/gso/cli/imports.py b/gso/cli/imports.py index 66c864594f15ede86a674e471af830d68dc5dead..700a63b42168861e24915340d99c37ceda9e5e01 100644 --- a/gso/cli/imports.py +++ b/gso/cli/imports.py @@ -700,6 +700,8 @@ def import_l3_core_service(filepath: str = common_filepath_option) -> None: ProductType.IMPORTED_IAS, ProductType.IMPORTED_LHCONE, ProductType.IMPORTED_COPERNICUS, + ProductType.IMPORTED_R_AND_E_PEER, + ProductType.IMPORTED_R_AND_E_LHCONE, ], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id", "product_id"], diff --git a/gso/migrations/versions/2025-05-29_9a7bae1f6438_add_mass_router_redeploy_task.py b/gso/migrations/versions/2025-05-29_9a7bae1f6438_add_mass_router_redeploy_task.py new file mode 100644 index 0000000000000000000000000000000000000000..7c21f6da597d730fa2a2a4d9ef0b986757d8bd6f --- /dev/null +++ b/gso/migrations/versions/2025-05-29_9a7bae1f6438_add_mass_router_redeploy_task.py @@ -0,0 +1,37 @@ +"""Add mass router redeploy task. + +Revision ID: 9a7bae1f6438 +Revises: 465008ed496e +Create Date: 2025-05-15 12:01:54.469229 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '9a7bae1f6438' +down_revision = '90547df711c3' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_task, delete_workflow + +new_tasks = [ + { + "name": "task_redeploy_base_config", + "description": "Redeploy base config on multiple routers" + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for task in new_tasks: + create_task(conn, task) + + +def downgrade() -> None: + conn = op.get_bind() + for task in new_tasks: + delete_workflow(conn, task["name"]) diff --git a/gso/migrations/versions/2025-06-04_1233b83e1124_add_r_e_peer_and_r_e_lhcone.py b/gso/migrations/versions/2025-06-04_1233b83e1124_add_r_e_peer_and_r_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..80192913768e337037e3b0e408b8232bc9f5db14 --- /dev/null +++ b/gso/migrations/versions/2025-06-04_1233b83e1124_add_r_e_peer_and_r_e_lhcone.py @@ -0,0 +1,101 @@ +"""Add R&E peer and R&E LHCONE. + +Revision ID: 1233b83e1124 +Revises: 9a7bae1f6438 +Create Date: 2025-06-04 13:37:22.122645 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '1233b83e1124' +down_revision = '9a7bae1f6438' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text(""" +DELETE FROM subscription_instances WHERE subscription_instances.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('PeeringConnectionBlock', 'CommercialPeerBlock', 'IXPortBlock', 'PrivatePeerPortBlock', 'L3InterfacePortBlock', 'TransitProviderPortBlock')) + """)) + conn.execute(sa.text(""" +DELETE FROM product_blocks WHERE product_blocks.name IN ('PeeringConnectionBlock', 'CommercialPeerBlock', 'IXPortBlock', 'PrivatePeerPortBlock', 'L3InterfacePortBlock', 'TransitProviderPortBlock') + """)) + conn.execute(sa.text(""" +DELETE FROM processes WHERE processes.pid IN (SELECT processes_subscriptions.pid FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Transit Provider Port', 'Private Peer Port', 'Transit Provider Port', 'Imported Private Peer Port', 'Commercial Peer', 'IX Port', 'Imported IX Port', 'Imported Commercial Peer')))) + """)) + conn.execute(sa.text(""" +DELETE FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Transit Provider Port', 'Private Peer Port', 'Transit Provider Port', 'Imported Private Peer Port', 'Commercial Peer', 'IX Port', 'Imported IX Port', 'Imported Commercial Peer'))) + """)) + conn.execute(sa.text(""" +DELETE FROM subscription_instances WHERE subscription_instances.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Transit Provider Port', 'Private Peer Port', 'Transit Provider Port', 'Imported Private Peer Port', 'Commercial Peer', 'IX Port', 'Imported IX Port', 'Imported Commercial Peer'))) + """)) + conn.execute(sa.text(""" +DELETE FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Transit Provider Port', 'Private Peer Port', 'Transit Provider Port', 'Imported Private Peer Port', 'Commercial Peer', 'IX Port', 'Imported IX Port', 'Imported Commercial Peer')) + """)) + conn.execute(sa.text(""" +DELETE FROM products WHERE products.name IN ('Imported Transit Provider Port', 'Private Peer Port', 'Transit Provider Port', 'Imported Private Peer Port', 'Commercial Peer', 'IX Port', 'Imported IX Port', 'Imported Commercial Peer') + """)) + conn.execute(sa.text(""" +INSERT INTO products (name, description, product_type, tag, status) VALUES ('R&E Peer', 'R&E Peer', 'RAndEPeer', 'RE_PEER', 'active') RETURNING products.product_id + """)) + conn.execute(sa.text(""" +INSERT INTO products (name, description, product_type, tag, status) VALUES ('Imported R&E Peer', 'Imported R&E Peer', 'ImportedRAndEPeer', 'IMP_RE_PEER', 'active') RETURNING products.product_id + """)) + conn.execute(sa.text(""" +INSERT INTO products (name, description, product_type, tag, status) VALUES ('R&E LHCOne', 'R&E LHCOne', 'RAndELHCOne', 'RE_LHCONE', 'active') RETURNING products.product_id + """)) + conn.execute(sa.text(""" +INSERT INTO products (name, description, product_type, tag, status) VALUES ('Imported R&E LHCOne', 'Imported R&E LHCOne', 'ImportedRAndELHCOne', 'IMP_RE_LHCONE', 'active') RETURNING products.product_id + """)) + conn.execute(sa.text(""" +INSERT INTO product_blocks (name, description, tag, status) VALUES ('RAndEPeerBlock', 'R&E Peer Product Block', 'RE_PEER_BLK', 'active') RETURNING product_blocks.product_block_id + """)) + conn.execute(sa.text(""" +INSERT INTO product_blocks (name, description, tag, status) VALUES ('RAndELHCOneBlock', 'R&E LLHCONE Prodcut Block', 'RE_LHCONE_BLK', 'active') RETURNING product_blocks.product_block_id + """)) + conn.execute(sa.text(""" +INSERT INTO product_product_blocks (product_id, product_block_id) VALUES ((SELECT products.product_id FROM products WHERE products.name IN ('R&E Peer')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndEPeerBlock'))), ((SELECT products.product_id FROM products WHERE products.name IN ('Imported R&E Peer')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndEPeerBlock'))) + """)) + conn.execute(sa.text(""" +INSERT INTO product_product_blocks (product_id, product_block_id) VALUES ((SELECT products.product_id FROM products WHERE products.name IN ('R&E LHCOne')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndELHCOneBlock'))), ((SELECT products.product_id FROM products WHERE products.name IN ('Imported R&E LHCOne')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndELHCOneBlock'))) + """)) + conn.execute(sa.text(""" +INSERT INTO product_block_relations (in_use_by_id, depends_on_id) VALUES ((SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndEPeerBlock')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('L3CoreServiceBlock'))), ((SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndELHCOneBlock')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('L3CoreServiceBlock'))) + """)) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text(""" +DELETE FROM product_product_blocks WHERE product_product_blocks.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('R&E Peer', 'Imported R&E Peer')) AND product_product_blocks.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndEPeerBlock')) + """)) + conn.execute(sa.text(""" +DELETE FROM product_product_blocks WHERE product_product_blocks.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('R&E LHCOne', 'Imported R&E LHCOne')) AND product_product_blocks.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndELHCOneBlock')) + """)) + conn.execute(sa.text(""" +DELETE FROM product_block_relations WHERE product_block_relations.in_use_by_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndEPeerBlock', 'RAndELHCOneBlock')) AND product_block_relations.depends_on_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('L3CoreServiceBlock')) + """)) + conn.execute(sa.text(""" +DELETE FROM subscription_instances WHERE subscription_instances.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('RAndEPeerBlock', 'RAndELHCOneBlock')) + """)) + conn.execute(sa.text(""" +DELETE FROM product_blocks WHERE product_blocks.name IN ('RAndEPeerBlock', 'RAndELHCOneBlock') + """)) + conn.execute(sa.text(""" +DELETE FROM processes WHERE processes.pid IN (SELECT processes_subscriptions.pid FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('R&E LHCOne', 'R&E Peer', 'Imported R&E Peer', 'Imported R&E LHCOne')))) + """)) + conn.execute(sa.text(""" +DELETE FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('R&E LHCOne', 'R&E Peer', 'Imported R&E Peer', 'Imported R&E LHCOne'))) + """)) + conn.execute(sa.text(""" +DELETE FROM subscription_instances WHERE subscription_instances.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('R&E LHCOne', 'R&E Peer', 'Imported R&E Peer', 'Imported R&E LHCOne'))) + """)) + conn.execute(sa.text(""" +DELETE FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('R&E LHCOne', 'R&E Peer', 'Imported R&E Peer', 'Imported R&E LHCOne')) + """)) + conn.execute(sa.text(""" +DELETE FROM products WHERE products.name IN ('R&E LHCOne', 'R&E Peer', 'Imported R&E Peer', 'Imported R&E LHCOne') + """)) diff --git a/gso/migrations/versions/2025-06-04_29813f5b2c95_add_r_and_e_peers_workflows.py b/gso/migrations/versions/2025-06-04_29813f5b2c95_add_r_and_e_peers_workflows.py new file mode 100644 index 0000000000000000000000000000000000000000..2615003ea3152f1eaab2039e44f532f53ce9284b --- /dev/null +++ b/gso/migrations/versions/2025-06-04_29813f5b2c95_add_r_and_e_peers_workflows.py @@ -0,0 +1,81 @@ +"""Add R and E peers workflows. + +Revision ID: 29813f5b2c95 +Revises: 1233b83e1124 +Create Date: 2025-06-04 14:17:20.069790 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '29813f5b2c95' +down_revision = '1233b83e1124' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_workflow, delete_workflow + +new_workflows = [ + { + "name": "create_r_and_e_peer", + "target": "CREATE", + "description": "Create R&E Peer", + "product_type": "RAndEPeer" + }, + { + "name": "modify_r_and_e_peer", + "target": "MODIFY", + "description": "Modify R&E Peer", + "product_type": "RAndEPeer" + }, + { + "name": "terminate_r_and_e_peer", + "target": "TERMINATE", + "description": "Terminate R&E Peer", + "product_type": "RAndEPeer" + }, + { + "name": "migrate_r_and_e_peer", + "target": "MODIFY", + "description": "Migrate R&E Peer", + "product_type": "RAndEPeer" + }, + { + "name": "create_imported_r_and_e_peer", + "target": "CREATE", + "description": "Create Imported R&E Peer", + "product_type": "ImportedRAndEPeer" + }, + { + "name": "import_r_and_e_peer", + "target": "MODIFY", + "description": "Import R&E Peer", + "product_type": "ImportedRAndEPeer" + }, + { + "name": "validate_r_and_e_peer", + "target": "SYSTEM", + "description": "Validate R&E Peer", + "product_type": "RAndEPeer" + }, + { + "name": "validate_r_and_e_peer_prefix_list", + "target": "SYSTEM", + "description": "Validate R&E Peer Prefix-List", + "product_type": "RAndEPeer" + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/migrations/versions/2025-06-04_d23b59abc6a5_add_r_e_lhcone_wfs.py b/gso/migrations/versions/2025-06-04_d23b59abc6a5_add_r_e_lhcone_wfs.py new file mode 100644 index 0000000000000000000000000000000000000000..5924c3abffefaedd8938ef43afe0a7d9965ed5ef --- /dev/null +++ b/gso/migrations/versions/2025-06-04_d23b59abc6a5_add_r_e_lhcone_wfs.py @@ -0,0 +1,75 @@ +"""Add R&E LHCONE WFs. + +Revision ID: d23b59abc6a5 +Revises: 29813f5b2c95 +Create Date: 2025-06-04 16:08:56.288635 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'd23b59abc6a5' +down_revision = '29813f5b2c95' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_workflow, delete_workflow + +new_workflows = [ + { + "name": "create_r_and_e_lhcone", + "target": "CREATE", + "description": "Create R&E LHCONE", + "product_type": "RAndELHCOne" + }, + { + "name": "modify_r_and_e_lhcone", + "target": "MODIFY", + "description": "Modify R&E LHCONE", + "product_type": "RAndELHCOne" + }, + { + "name": "terminate_r_and_e_lhcone", + "target": "TERMINATE", + "description": "Terminate R&E LHCONE", + "product_type": "RAndELHCOne" + }, + { + "name": "migrate_r_and_e_lhcone", + "target": "MODIFY", + "description": "Migrate R&E LHCONE", + "product_type": "RAndELHCOne" + }, + { + "name": "create_imported_r_and_e_lhcone", + "target": "CREATE", + "description": "Create Imported R&E LHCONE", + "product_type": "ImportedRAndELHCOne" + }, + { + "name": "import_r_and_e_lhcone", + "target": "MODIFY", + "description": "Import R&E LHCONE", + "product_type": "ImportedRAndELHCOne" + }, + { + "name": "validate_r_and_e_lhcone", + "target": "SYSTEM", + "description": "Validate R&E LHCONE", + "product_type": "RAndELHCOne" + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/products/__init__.py b/gso/products/__init__.py index fff581523777c10fc3461bb16ce72cdd1b906548..8ad0e63272e365ed25e1d05d92246ffa747985b6 100644 --- a/gso/products/__init__.py +++ b/gso/products/__init__.py @@ -19,6 +19,8 @@ from gso.products.product_types.lhcone import ImportedLHCOne, LHCOne from gso.products.product_types.office_router import ImportedOfficeRouter, OfficeRouter from gso.products.product_types.opengear import ImportedOpengear, Opengear from gso.products.product_types.pop_vlan import PopVlan +from gso.products.product_types.r_and_e_lhcone import ImportedRAndELHCOne, RAndELHCOne +from gso.products.product_types.r_and_e_peer import ImportedRAndEPeer, RAndEPeer from gso.products.product_types.router import ImportedRouter, Router from gso.products.product_types.site import ImportedSite, Site from gso.products.product_types.super_pop_switch import ImportedSuperPopSwitch, SuperPopSwitch @@ -67,6 +69,12 @@ class ProductName(strEnum): LHCONE = "LHCOne" """LHCOne.""" IMPORTED_LHCONE = "Imported LHCOne" + R_AND_E_PEER = "R&E Peer" + """R&E Peer products.""" + IMPORTED_R_AND_E_PEER = "Imported R&E Peer" + R_AND_E_LHCONE = "R&E LHCOne" + """R&E LHCOne products.""" + IMPORTED_R_AND_E_LHCONE = "Imported R&E LHCOne" COPERNICUS = "Copernicus" """Copernicus.""" IMPORTED_COPERNICUS = "Imported Copernicus" @@ -116,6 +124,10 @@ class ProductType(strEnum): IMPORTED_GEANT_IP = ImportedGeantIP.__name__ LHCONE = LHCOne.__name__ IMPORTED_LHCONE = ImportedLHCOne.__name__ + R_AND_E_PEER = RAndEPeer.__name__ + IMPORTED_R_AND_E_PEER = ImportedRAndEPeer.__name__ + R_AND_E_LHCONE = RAndELHCOne.__name__ + IMPORTED_R_AND_E_LHCONE = ImportedRAndELHCOne.__name__ COPERNICUS = Copernicus.__name__ IMPORTED_COPERNICUS = ImportedCopernicus.__name__ @@ -147,6 +159,10 @@ SUBSCRIPTION_MODEL_REGISTRY.update( ProductName.IMPORTED_IAS.value: ImportedIAS, ProductName.LHCONE.value: LHCOne, ProductName.IMPORTED_LHCONE.value: ImportedLHCOne, + ProductName.R_AND_E_PEER.value: RAndEPeer, + ProductName.IMPORTED_R_AND_E_PEER.value: ImportedRAndEPeer, + ProductName.R_AND_E_LHCONE.value: RAndELHCOne, + ProductName.IMPORTED_R_AND_E_LHCONE.value: ImportedRAndELHCOne, ProductName.COPERNICUS.value: Copernicus, ProductName.IMPORTED_COPERNICUS.value: ImportedCopernicus, ProductName.GEANT_PLUS.value: Layer2Circuit, diff --git a/gso/products/product_blocks/r_and_e_lhcone.py b/gso/products/product_blocks/r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..e4dda2e22a52953ddf2dcf333640bed98a9ef76e --- /dev/null +++ b/gso/products/product_blocks/r_and_e_lhcone.py @@ -0,0 +1,30 @@ +"""Product blocks for R&E LHCONE products.""" + +from orchestrator.domain.base import ProductBlockModel +from orchestrator.types import SubscriptionLifecycle + +from gso.products.product_blocks.l3_core_service import ( + L3CoreServiceBlock, + L3CoreServiceBlockInactive, + L3CoreServiceBlockProvisioning, +) + + +class RAndELHCOneBlockInactive( + ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="RAndELHCOneBlock" +): + """An inactive R&E LHCONE product block. See `RAndELHCOneBlock`.""" + + l3_core: L3CoreServiceBlockInactive + + +class RAndELHCOneBlockProvisioning(RAndELHCOneBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): + """A provisioning R&E LHCONE product block. See `RAndELHCOneBlock`.""" + + l3_core: L3CoreServiceBlockProvisioning + + +class RAndELHCOneBlock(RAndELHCOneBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): + """An active R&E LHCONE product block.""" + + l3_core: L3CoreServiceBlock diff --git a/gso/products/product_blocks/r_and_e_peer.py b/gso/products/product_blocks/r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..8fbbeb84e62ee88b944eeb4782ee1800d723a405 --- /dev/null +++ b/gso/products/product_blocks/r_and_e_peer.py @@ -0,0 +1,30 @@ +"""Product blocks for R&E Peer products.""" + +from orchestrator.domain.base import ProductBlockModel +from orchestrator.types import SubscriptionLifecycle + +from gso.products.product_blocks.l3_core_service import ( + L3CoreServiceBlock, + L3CoreServiceBlockInactive, + L3CoreServiceBlockProvisioning, +) + + +class RAndEPeerBlockInactive( + ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="RAndEPeerBlock" +): + """An inactive R&E Peer product block. See `RAndEPeerBlock`.""" + + l3_core: L3CoreServiceBlockInactive + + +class RAndEPeerBlockProvisioning(RAndEPeerBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): + """A provisioning R&E Peer product block. See `RAndEPeerBlock`.""" + + l3_core: L3CoreServiceBlockProvisioning + + +class RAndEPeerBlock(RAndEPeerBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): + """An active R&E Peer product block.""" + + l3_core: L3CoreServiceBlock diff --git a/gso/products/product_types/r_and_e_lhcone.py b/gso/products/product_types/r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..820a9a844c60e39cd1869056f48636aefaec5c72 --- /dev/null +++ b/gso/products/product_types/r_and_e_lhcone.py @@ -0,0 +1,81 @@ +"""Product type for R&E LHCONE.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from orchestrator.types import SubscriptionLifecycle + +from gso.products.product_blocks.r_and_e_lhcone import ( + RAndELHCOneBlock, + RAndELHCOneBlockInactive, + RAndELHCOneBlockProvisioning, +) +from gso.products.product_types.l3_core_service import BaseL3SubscriptionModel + +if TYPE_CHECKING: + from gso.products.product_blocks.l3_core_service import ( + L3CoreServiceBlockInactive, + ) + + +class RAndELHCOneInactive(BaseL3SubscriptionModel, is_base=True): + """An R&E LHCONE product that is inactive.""" + + r_and_e_lhcone: RAndELHCOneBlockInactive + + @property + def l3_core(self) -> L3CoreServiceBlockInactive: + """Getter: Retrieve the l3_core from the r_and_e_lhcone attribute.""" + return self.r_and_e_lhcone.l3_core + + @l3_core.setter + def l3_core(self, value: L3CoreServiceBlockInactive) -> None: + """Setter: Set the l3_core on the r_and_e_lhcone attribute.""" + self.r_and_e_lhcone.l3_core = value + + @property + def service_name_attribute(self) -> str: + """Get the service name.""" + return "r_and_e_lhcone" + + +class RAndELHCOneProvisioning(RAndELHCOneInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): + """An R&E LHCONE product that is being provisioned.""" + + r_and_e_lhcone: RAndELHCOneBlockProvisioning + + +class RAndELHCOne(RAndELHCOneProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): + """An R&E LHCONE product that is active.""" + + r_and_e_lhcone: RAndELHCOneBlock + + +class ImportedRAndELHCOneInactive(BaseL3SubscriptionModel, is_base=True): + """An imported R&E LHCONE product that is inactive.""" + + r_and_e_lhcone: RAndELHCOneBlockInactive + + @property + def l3_core(self) -> L3CoreServiceBlockInactive: + """Getter: Retrieve the l3_core from the r_and_e_lhcone attribute.""" + return self.r_and_e_lhcone.l3_core + + @l3_core.setter + def l3_core(self, value: L3CoreServiceBlockInactive) -> None: + """Setter: Set the l3_core on the r_and_e_lhcone attribute.""" + self.r_and_e_lhcone.l3_core = value + + @property + def service_name_attribute(self) -> str: + """Get the service name.""" + return "r_and_e_lhcone" + + +class ImportedRAndELHCOne( + ImportedRAndELHCOneInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE] +): + """An imported R&E LHCONE product that is active.""" + + r_and_e_lhcone: RAndELHCOneBlock diff --git a/gso/products/product_types/r_and_e_peer.py b/gso/products/product_types/r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..6ea04f374ac5255354a1718d95a564514a9c6c05 --- /dev/null +++ b/gso/products/product_types/r_and_e_peer.py @@ -0,0 +1,81 @@ +"""Product type for R&E Peer.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from orchestrator.types import SubscriptionLifecycle + +from gso.products.product_blocks.r_and_e_peer import ( + RAndEPeerBlock, + RAndEPeerBlockInactive, + RAndEPeerBlockProvisioning, +) +from gso.products.product_types.l3_core_service import BaseL3SubscriptionModel + +if TYPE_CHECKING: + from gso.products.product_blocks.l3_core_service import ( + L3CoreServiceBlockInactive, + ) + + +class RAndEPeerInactive(BaseL3SubscriptionModel, is_base=True): + """An R&E Peer product that is inactive.""" + + r_and_e_peer: RAndEPeerBlockInactive + + @property + def l3_core(self) -> L3CoreServiceBlockInactive: + """Getter: Retrieve the l3_core from the R&E Peer attribute.""" + return self.r_and_e_peer.l3_core + + @l3_core.setter + def l3_core(self, value: L3CoreServiceBlockInactive) -> None: + """Setter: Set the l3_core on the R&E Peer attribute.""" + self.r_and_e_peer.l3_core = value + + @property + def service_name_attribute(self) -> str: + """Get the service name.""" + return "r_and_e_peer" + + +class RAndEPeerProvisioning(RAndEPeerInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): + """An R&E Peer product that is being provisioned.""" + + r_and_e_peer: RAndEPeerBlockProvisioning + + +class RAndEPeer(RAndEPeerProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): + """An R&E Peer product that is active.""" + + r_and_e_peer: RAndEPeerBlock + + +class ImportedRAndEPeerInactive(BaseL3SubscriptionModel, is_base=True): + """An imported R&E Peer product that is inactive.""" + + r_and_e_peer: RAndEPeerBlockInactive + + @property + def l3_core(self) -> L3CoreServiceBlockInactive: + """Getter: Retrieve the l3_core from the R&E Peer attribute.""" + return self.r_and_e_peer.l3_core + + @l3_core.setter + def l3_core(self, value: L3CoreServiceBlockInactive) -> None: + """Setter: Set the l3_core on the R&E Peer attribute.""" + self.r_and_e_peer.l3_core = value + + @property + def service_name_attribute(self) -> str: + """Get the service name.""" + return "r_and_e_peer" + + +class ImportedRAndEPeer( + ImportedRAndEPeerInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE] +): + """An imported R&E Peer product that is active.""" + + r_and_e_peer: RAndEPeerBlock diff --git a/gso/services/lso_client.py b/gso/services/lso_client.py index 67e2791752d1d5d9c4be584b21915e9f5ec9832a..8cfa6785a55d693283093ed80872a999908cf9b9 100644 --- a/gso/services/lso_client.py +++ b/gso/services/lso_client.py @@ -56,7 +56,7 @@ def _send_request(parameters: dict, callback_route: str) -> None: logger.debug(debug_msg) parameters.update({"callback": callback_url}) - url = f"{params.scheme}://{params.api_base}/api/playbook" + url = f"{params.scheme}://{params.api_base}/api/playbook/" request = requests.post(url, json=parameters, timeout=10) request.raise_for_status() diff --git a/gso/services/processes.py b/gso/services/processes.py index dba8abc29b2c86fec9b74064acf1d4b25c0c44f8..30caa96fddf3d997fa9ac91254d5934f5707a830 100644 --- a/gso/services/processes.py +++ b/gso/services/processes.py @@ -4,11 +4,13 @@ This prevents someone from having to re-write database statements many times, th or inconsistent when not careful. These methods are related to operations regarding processes and workflows. """ -from orchestrator.db import ProcessTable, WorkflowTable, db -from orchestrator.workflow import ProcessStatus +from uuid import UUID + +from orchestrator.db import ProcessStepTable, ProcessSubscriptionTable, ProcessTable, WorkflowTable, db +from orchestrator.workflow import ProcessStatus, StepStatus from pydantic_forms.types import UUIDstr from sqlalchemy import ScalarResult, and_, or_, select -from sqlalchemy.orm import Query +from sqlalchemy.orm import Query, joinedload def get_processes_by_workflow_name(workflow_name: str) -> Query: @@ -65,3 +67,21 @@ def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarRes ) ) ) + + +def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None: + """Get a stopped process by its ID.""" + return ( + db.session.query(ProcessTable) + .join(ProcessTable.steps) + .filter( + ProcessTable.process_id == process_id, + ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}), + ) + .options( + joinedload(ProcessTable.steps), + joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription), + ) + .order_by(ProcessTable.last_modified_at) + .one_or_none() + ) diff --git a/gso/tasks/massive_redeploy_base_config.py b/gso/tasks/massive_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..011df8dac042af59d2d580484917c947852c59fb --- /dev/null +++ b/gso/tasks/massive_redeploy_base_config.py @@ -0,0 +1,110 @@ +"""Massive redeploy base config task for routers.""" + +import logging + +import requests +from celery import chord, shared_task +from orchestrator.services.processes import start_process +from orchestrator.workflow import ProcessStatus +from pydantic_forms.exceptions import FormValidationError +from pydantic_forms.types import UUIDstr + +from gso import settings +from gso.products.product_types.router import Router +from gso.utils.helpers import wait_for_workflow_to_stop +from gso.utils.types.tt_number import TTNumber + +logger = logging.getLogger(__name__) + + +@shared_task(ignore_result=False) +def process_one_router(router_id: UUIDstr, tt_number: TTNumber) -> tuple[str, bool, str]: + """Celery subtask to start & wait for a single router redeploy. + + Returns (router_fqdn, succeeded:bool, message:str). + """ + router_fqdn = Router.from_subscription(router_id).router.router_fqdn + succeeded = False + message = "" + try: + pid = start_process( + "redeploy_base_config", + user_inputs=[ + {"subscription_id": router_id}, + {"tt_number": tt_number, "is_massive_redeploy": True}, + ], + ) + proc = wait_for_workflow_to_stop(pid, check_interval=5, max_retries=60) + if proc is None: + message = "Timed out waiting for workflow to complete" + elif proc.last_step == "Done" and proc.last_status == ProcessStatus.COMPLETED: + succeeded = True + message = "Done" + elif proc.last_status == ProcessStatus.ABORTED: + message = "Workflow was aborted" + elif proc.last_status == ProcessStatus.FAILED: + message = proc.failed_reason or "Workflow failed without a reason" + else: + message = f"Workflow status: {proc.last_status}, last step: {proc.last_step}" + + except FormValidationError as e: + message = f"Validation error: {e}" + except Exception as e: # noqa: BLE001 + message = f"Unexpected error: {e}" + + return router_fqdn, succeeded, message + + +@shared_task(ignore_result=False) +def finalize_massive_redeploy( + results: list[tuple[str, bool, str]], callback_route: str, selected_routers: list[UUIDstr] +) -> None: + """Called once after all process_one_router tasks.`results` is a list of (FQDN, succeeded, message) tuples.""" + successful_wfs = {} + failed_wfs = {} + for router_fqdn, ok, msg in results: + if ok: + successful_wfs[router_fqdn] = msg + else: + failed_wfs[router_fqdn] = msg + + # fire callback + oss = settings.load_oss_params() + callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}" + payload = {"failed_wfs": failed_wfs, "successful_wfs": successful_wfs} + + try: + response = requests.post(callback_url, json=payload, timeout=30) + if not response.ok: + logger.exception( + "Callback failed", + extra={ + "status_code": response.status_code, + "response_text": response.text, + "callback_url": callback_url, + "failed_wfs": failed_wfs, + "selected_routers": selected_routers, + }, + ) + except Exception as e: + msg = f"Failed to post callback: {e}" + logger.exception( + msg, + extra={ + "callback_url": callback_url, + "failed_wfs": failed_wfs, + "selected_routers": selected_routers, + }, + ) + + +@shared_task(ignore_result=False) +def massive_redeploy_base_config_task( + selected_routers: list[UUIDstr], + tt_number: TTNumber, + callback_route: str, +) -> None: + """Kicks off one Celery subtask per router, then runs the final callback.""" + unique_ids = list(dict.fromkeys(selected_routers)) + header = [process_one_router.s(rid, tt_number) for rid in unique_ids] # type: ignore[attr-defined] + chord(header)(finalize_massive_redeploy.s(callback_route, unique_ids)) # type: ignore[attr-defined] diff --git a/gso/translations/en-GB.json b/gso/translations/en-GB.json index 7991b89dd89c838c13bb45fa730ad521db3e0dc9..e3ea7bc6c1dd75b253f09bbec50844103087f84b 100644 --- a/gso/translations/en-GB.json +++ b/gso/translations/en-GB.json @@ -98,6 +98,8 @@ "create_imported_site": "NOT FOR HUMANS -- Import existing site", "create_imported_super_pop_switch": "NOT FOR HUMANS -- Import existing super PoP switch", "create_imported_switch": "NOT FOR HUMANS -- Import existing Switch", + "create_imported_r_and_e_peer": "NOT FOR HUMANS -- Import existing R&E Peer", + "create_imported_r_and_e_lhcone": "NOT FOR HUMANS -- Import existing R&E LHCONE", "create_iptrunk": "Create IP Trunk", "create_geant_ip": "Create GÉANT IP", "create_lhcone": "Create LHCOne", @@ -109,6 +111,8 @@ "create_site": "Create Site", "create_switch": "Create Switch", "create_vrf": "Create VRF", + "create_r_and_e_peer": "Create R&E Peer", + "create_r_and_e_lhcone": "Create R&E LHCONE", "deploy_twamp": "Deploy TWAMP", "import_edge_port": "NOT FOR HUMANS -- Finalize import into an Edge Port", "import_iptrunk": "NOT FOR HUMANS -- Finalize import into an IP trunk product", @@ -124,11 +128,15 @@ "import_site": "NOT FOR HUMANS -- Finalize import into a Site product", "import_super_pop_switch": "NOT FOR HUMANS -- Finalize import into a Super PoP switch", "import_switch": "NOT FOR HUMANS -- Finalize import into a Switch", + "import_r_and_e_peer": "NOT FOR HUMANS -- Finalize import into a R&E Peer", + "import_r_and_e_lhcone": "NOT FOR HUMANS -- Finalize import into a R&E LHCONE", "migrate_edge_port": "Migrate Edge Port", "migrate_iptrunk": "Migrate IP Trunk", "migrate_layer_2_circuit": "Migrate Layer 2 Circuit", "migrate_ias": "Migrate IAS", "migrate_lhcone": "Migrate LHCOne", + "migrate_r_and_e_peer": "Migrate R&E Peer", + "migrate_r_and_e_lhcone": "Migrate R&E LHCOne", "migrate_copernicus": "Migrate Copernicus", "migrate_geant_ip": "Migrate GÉANT IP", "modify_connection_strategy": "Modify connection strategy", @@ -143,6 +151,8 @@ "modify_site": "Modify Site", "modify_trunk_interface": "Modify IP Trunk interface", "modify_vrf_router_list": "Modify VRF router list", + "modify_r_and_e_peer": "Modify R&E Peer", + "modify_r_and_e_lhcone": "Modify R&E LHCONE", "promote_p_to_pe": "Promote P to PE", "redeploy_base_config": "Redeploy base config", "redeploy_vrf": "Redeploy VRF router list", @@ -151,6 +161,7 @@ "task_create_partners": "Create partner task", "task_delete_partners": "Delete partner task", "task_modify_partners": "Modify partner task", + "task_redeploy_base_config": "Redeploy base config on multiple routers", "task_send_email_notifications": "Send email notifications for failed tasks", "task_validate_geant_products": "Validation task for GEANT products", "terminate_edge_port": "Terminate Edge Port", @@ -165,6 +176,8 @@ "terminate_site": "Terminate Site", "terminate_switch": "Terminate Switch", "terminate_vrf": "Terminate VRF", + "terminate_r_and_e_peer": "Terminate R&E Peer", + "terminate_r_and_e_lhcone": "Terminate R&E LHCONE", "update_ibgp_mesh": "Update iBGP mesh", "validate_edge_port": "Validate Edge Port", "validate_iptrunk": "Validate IP Trunk configuration", @@ -175,6 +188,9 @@ "validate_lan_switch_interconnect": "Validate LAN Switch Interconnect", "validate_geant_ip_prefix_list": "Validate GÉANT IP Prefix-List", "validate_router": "Validate Router configuration", - "validate_switch": "Validate Switch configuration" + "validate_switch": "Validate Switch configuration", + "validate_r_and_e_peer": "Validate R&E Peer", + "validate_r_and_e_lhcone": "Validate R&E LHCONE", + "validate_r_and_e_peer_prefix_list": "Validate R&E Peer Prefix-List" } } diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index fc4c5dee2c279d9007965a488f721ea78f32e9f7..4e4ea7d7015fbd5632b7382e5a2e0e2923abff2c 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -1,11 +1,14 @@ """Helper methods that are used across GSO.""" +import logging import random import re +import time from ipaddress import IPv4Network, IPv6Network from typing import TYPE_CHECKING, TypeAlias, cast from uuid import UUID +from orchestrator.db import ProcessTable from orchestrator.types import SubscriptionLifecycle from pydantic_forms.types import UUIDstr from pydantic_forms.validators import Choice @@ -15,6 +18,7 @@ from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import Router from gso.services.netbox_client import NetboxClient from gso.services.partners import get_all_partners +from gso.services.processes import get_stopped_process_by_id from gso.services.subscriptions import ( get_active_edge_port_subscriptions, get_active_router_subscriptions, @@ -30,11 +34,13 @@ from gso.utils.types.interfaces import PhysicalPortCapacity from gso.utils.types.ip_address import IPv4AddressType, IPv4NetworkType, IPv6NetworkType from gso.utils.types.virtual_identifiers import VC_ID +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock -def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: +def available_interfaces_choices(router_id: UUID, speed: str) -> TypeAlias: """Return a list of available interfaces for a given router and speed. For Nokia routers, return a list of available interfaces. @@ -46,14 +52,17 @@ def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: interface["name"]: f"{interface["name"]} {interface["description"]}" for interface in NetboxClient().get_available_interfaces(router_id, speed) } - return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)), + ) def available_interfaces_choices_including_current_members( router_id: UUID, speed: str, interfaces: list["IptrunkInterfaceBlock"], -) -> Choice | None: +) -> TypeAlias: """Return a list of available interfaces for a given router and speed including the current members. For Nokia routers, return a list of available interfaces. @@ -75,10 +84,13 @@ def available_interfaces_choices_including_current_members( options = { interface["name"]: f"{interface["name"]} {interface["description"]}" for interface in available_interfaces } - return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae member", zip(options.keys(), options.items(), strict=True)), + ) -def available_lags_choices(router_id: UUID) -> Choice | None: +def available_lags_choices(router_id: UUID) -> TypeAlias: """Return a list of available lags for a given router. For Nokia routers, return a list of available lags. @@ -87,10 +99,13 @@ def available_lags_choices(router_id: UUID) -> Choice | None: if get_router_vendor(router_id) != Vendor.NOKIA: return None side_a_ae_iface_list = NetboxClient().get_available_lags(router_id) - return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)), + ) -def available_service_lags_choices(router_id: UUID) -> Choice | None: +def available_service_lags_choices(router_id: UUID) -> TypeAlias: """Return a list of available lags for a given router for services. For Nokia routers, return a list of available lags. @@ -99,7 +114,10 @@ def available_service_lags_choices(router_id: UUID) -> Choice | None: if get_router_vendor(router_id) != Vendor.NOKIA: return None side_a_ae_iface_list = NetboxClient().get_available_services_lags(router_id) - return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)), + ) def get_router_vendor(router_id: UUID) -> Vendor: @@ -222,30 +240,60 @@ def calculate_recommended_minimum_links(iptrunk_number_of_members: int, iptrunk_ return iptrunk_number_of_members -def active_site_selector() -> Choice: +def active_site_selector() -> TypeAlias: """Generate a dropdown selector for choosing an active site in an input form.""" site_subscriptions = { str(site["subscription_id"]): site["description"] for site in get_active_site_subscriptions(includes=["subscription_id", "description"]) } - return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)), + ) -def active_router_selector(*, excludes: list[UUIDstr] | None = None) -> Choice: - """Generate a dropdown selector for choosing an active Router in an input form.""" - if excludes is None: - excludes = [] +def active_router_selector(*, excludes: list[UUIDstr] | None = None) -> TypeAlias: + """Generate a dropdown selector for choosing an active Router in an input form. + + The resulting list of routers can be filtered using a list of excluded subscription IDs. + """ + excludes = excludes or [] router_subscriptions = { str(router["subscription_id"]): router["description"] for router in get_active_router_subscriptions(includes=["subscription_id", "description"]) if router["subscription_id"] not in excludes } - return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)), + ) -def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> Choice: +def active_nokia_router_selector(*, excludes: list[UUIDstr] | None = None) -> TypeAlias: + """Generate a dropdown choice list of all active Nokia routers. + + Args: + excludes: An optional list of subscription IDs that should be excluded from the resulting dropdown. + """ + excludes = excludes or [] + router_subscriptions = { + str(router.subscription_id): router.description + for router in [ + Router.from_subscription(subscription["subscription_id"]) + for subscription in get_active_router_subscriptions(["subscription_id"]) + ] + if router.subscription_id not in excludes and router.router.vendor == Vendor.NOKIA + } + + return cast( + type[Choice], + Choice.__call__("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)), + ) + + +def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> TypeAlias: """Generate a dropdown selector for choosing an active PE Router in an input form.""" excludes = excludes or [] @@ -255,17 +303,23 @@ def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> Choice: if router.subscription_id not in excludes } - return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a router", zip(routers.keys(), routers.items(), strict=True)), + ) -def active_switch_selector() -> Choice: +def active_switch_selector() -> TypeAlias: """Generate a dropdown selector for choosing an active Switch in an input form.""" switch_subscriptions = { str(switch["subscription_id"]): switch["description"] for switch in get_active_switch_subscriptions(includes=["subscription_id", "description"]) } - return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)), + ) def active_edge_port_selector(*, partner_id: UUIDstr | None = None) -> TypeAlias: @@ -293,11 +347,14 @@ def ip_trunk_service_version_selector() -> Choice: ) -def partner_choice() -> Choice: +def partner_choice() -> TypeAlias: """Return a Choice object containing a list of available partners.""" partners = {partner.partner_id: partner.name for partner in get_all_partners()} - return Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a partner", zip(partners.values(), partners.items(), strict=True)), + ) def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int, enable_lacp: bool) -> None: @@ -325,7 +382,7 @@ def generate_unique_vc_id(l2c_type: str, max_attempts: int = 100) -> VC_ID | Non ``Ethernet`` and ``VLAN`` type circuits get their IDs from different ranges. Args: - l2c_type: type of l2circuit. + l2c_type: type of Layer 2 Circuit. max_attempts: The maximum number of attempts to generate a unique ID. Returns: @@ -344,3 +401,30 @@ def generate_unique_vc_id(l2c_type: str, max_attempts: int = 100) -> VC_ID | Non return VC_ID(vc_id) return None + + +def wait_for_workflow_to_stop( + process_id: UUIDstr | UUID, + check_interval: int, + max_retries: int, +) -> ProcessTable | None: + """Waits until any step in the workflow reaches a terminal status. + + :param process_id: ID of the workflow process + :param check_interval: Seconds between checks + :param max_retries: Max number of retries before giving up + :return: process object if it has stopped, None if it timed out + """ + for attempt in range(max_retries): + if process := get_stopped_process_by_id(process_id): + msg = f"✅ Process {process_id} has stopped with status: {process.last_status}" + logger.info(msg) + return process + + msg = f"⏳ Attempt {attempt + 1}/{max_retries}: Waiting for workflow to progress..." + logger.info(msg) + time.sleep(check_interval) + + msg = f"❌ Timeout reached. Workflow {process_id} did not stop after {max_retries * check_interval} seconds." + logger.error(msg) + return None diff --git a/gso/worker.py b/gso/worker.py index ed9c01ffc5fc3edf055ad48a60100f8e1911f56c..fc21f0c3bbf91e0f3c0df54e9772ce761083124b 100644 --- a/gso/worker.py +++ b/gso/worker.py @@ -84,6 +84,7 @@ celery = OrchestratorWorker( "gso.schedules.clean_old_tasks", "orchestrator.services.tasks", "gso.tasks.start_process", + "gso.tasks.massive_redeploy_base_config", ], ) diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index 8b1f4065d11869daa09e3ab124caf4b7484a89e0..3456115a4439eb0bdfe98e2291b9bcb65bde0952 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -110,6 +110,7 @@ LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partner LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners") LazyWorkflowInstance("gso.workflows.tasks.clean_old_tasks", "task_clean_old_tasks") LazyWorkflowInstance("gso.workflows.tasks.check_site_connectivity", "task_check_site_connectivity") +LazyWorkflowInstance("gso.workflows.tasks.redeploy_base_config", "task_redeploy_base_config") # Edge port workflows LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_port") @@ -172,3 +173,30 @@ LazyWorkflowInstance("gso.workflows.vrf.create_vrf", "create_vrf") LazyWorkflowInstance("gso.workflows.vrf.modify_vrf_router_list", "modify_vrf_router_list") LazyWorkflowInstance("gso.workflows.vrf.redeploy_vrf", "redeploy_vrf") LazyWorkflowInstance("gso.workflows.vrf.terminate_vrf", "terminate_vrf") + +# R&E Peer workflows +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_peer.create_r_and_e_peer", "create_r_and_e_peer") +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_peer.modify_r_and_e_peer", "modify_r_and_e_peer") +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_peer.terminate_r_and_e_peer", "terminate_r_and_e_peer") +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_peer.migrate_r_and_e_peer", "migrate_r_and_e_peer") +LazyWorkflowInstance( + "gso.workflows.l3_core_service.r_and_e_peer.create_imported_r_and_e_peer", "create_imported_r_and_e_peer" +) +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_peer.import_r_and_e_peer", "import_r_and_e_peer") +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_peer.validate_r_and_e_peer", "validate_r_and_e_peer") +LazyWorkflowInstance( + "gso.workflows.l3_core_service.r_and_e_peer.validate_r_and_e_peer_prefix_list", "validate_r_and_e_peer_prefix_list" +) + +# R&E LHCONE workflows +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_lhcone.create_r_and_e_lhcone", "create_r_and_e_lhcone") +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_lhcone.modify_r_and_e_lhcone", "modify_r_and_e_lhcone") +LazyWorkflowInstance( + "gso.workflows.l3_core_service.r_and_e_lhcone.terminate_r_and_e_lhcone", "terminate_r_and_e_lhcone" +) +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_lhcone.migrate_r_and_e_lhcone", "migrate_r_and_e_lhcone") +LazyWorkflowInstance( + "gso.workflows.l3_core_service.r_and_e_lhcone.create_imported_r_and_e_lhcone", "create_imported_r_and_e_lhcone" +) +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_lhcone.import_r_and_e_lhcone", "import_r_and_e_lhcone") +LazyWorkflowInstance("gso.workflows.l3_core_service.r_and_e_lhcone.validate_r_and_e_lhcone", "validate_r_and_e_lhcone") diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/__init__.py b/gso/workflows/l3_core_service/r_and_e_lhcone/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..01212adc155b487bd805ec8ba90114e1b1e4b4d1 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/__init__.py @@ -0,0 +1 @@ +"""R&E LHCONE service workflows.""" diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/create_imported_r_and_e_lhcone.py b/gso/workflows/l3_core_service/r_and_e_lhcone/create_imported_r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..a4447da019a640d0c0832ebf8f5baa5be0a1a2c9 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/create_imported_r_and_e_lhcone.py @@ -0,0 +1,43 @@ +"""A creation workflow for adding an existing Imported R&E LHCONE to the service database.""" + +from orchestrator import workflow +from orchestrator.targets import Target +from orchestrator.types import SubscriptionLifecycle +from orchestrator.workflow import StepList, begin, done, step +from orchestrator.workflows.steps import resync, set_status, store_process_subscription + +from gso.products import ProductName +from gso.products.product_types.r_and_e_lhcone import ImportedRAndELHCOneInactive +from gso.services.partners import get_partner_by_name +from gso.services.subscriptions import get_product_id_by_name +from gso.workflows.l3_core_service.base_create_imported_l3_core_service import ( + initial_input_form_generator, + initialize_subscription, +) + + +@step("Create subscription") +def create_subscription(partner: str) -> dict: + """Create a new subscription object in the database.""" + partner_id = get_partner_by_name(partner).partner_id + product_id = get_product_id_by_name(ProductName.IMPORTED_R_AND_E_LHCONE) + subscription = ImportedRAndELHCOneInactive.from_product_id(product_id, partner_id) + return {"subscription": subscription, "subscription_id": subscription.subscription_id} + + +@workflow( + "Create Imported R&E LHCONE", + initial_input_form=initial_input_form_generator, + target=Target.CREATE, +) +def create_imported_r_and_e_lhcone() -> StepList: + """Import an R&E LHCONE without provisioning it.""" + return ( + begin + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/create_r_and_e_lhcone.py b/gso/workflows/l3_core_service/r_and_e_lhcone/create_r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..2063c415f9f2ed1e6cb49a3b32ea4b5fabefecc7 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/create_r_and_e_lhcone.py @@ -0,0 +1,61 @@ +"""Create R&E LHCONE subscription workflow.""" + +from orchestrator.targets import Target +from orchestrator.types import SubscriptionLifecycle +from orchestrator.workflow import StepList, begin, done, step, workflow +from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from orchestrator.workflows.utils import wrap_create_initial_input_form +from pydantic_forms.types import State, UUIDstr + +from gso.products.product_types.r_and_e_lhcone import RAndELHCOneInactive +from gso.services.lso_client import lso_interaction +from gso.utils.workflow_steps import prompt_sharepoint_checklist_url, start_moodi, stop_moodi +from gso.workflows.l3_core_service.base_create_l3_core_service import ( + check_bgp_peers, + check_sbp_functionality, + create_new_sharepoint_checklist, + deploy_bgp_peers_dry, + deploy_bgp_peers_real, + initial_input_form_generator, + initialize_subscription, + provision_sbp_dry, + provision_sbp_real, + update_dns_records, +) + + +@step("Create subscription") +def create_subscription(product: UUIDstr, partner: str) -> State: + """Create a new subscription object in the database.""" + subscription = RAndELHCOneInactive.from_product_id(product, partner) + + return {"subscription": subscription, "subscription_id": subscription.subscription_id} + + +@workflow( + "Create R&E LHCONE", + initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), + target=Target.CREATE, +) +def create_r_and_e_lhcone() -> StepList: + """Create a new R&E LHCONE subscription.""" + return ( + begin + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> start_moodi() + >> lso_interaction(provision_sbp_dry) + >> lso_interaction(provision_sbp_real) + >> lso_interaction(check_sbp_functionality) + >> lso_interaction(deploy_bgp_peers_dry) + >> lso_interaction(deploy_bgp_peers_real) + >> lso_interaction(check_bgp_peers) + >> update_dns_records + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> create_new_sharepoint_checklist + >> prompt_sharepoint_checklist_url + >> stop_moodi() + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/import_r_and_e_lhcone.py b/gso/workflows/l3_core_service/r_and_e_lhcone/import_r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..d298e00a464574d04411b8e5f69200057b9d6160 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/import_r_and_e_lhcone.py @@ -0,0 +1,37 @@ +"""A modification workflow for migrating an `ImportedRAndELHCOne` to an `RAndELHCOne` subscription.""" + +from orchestrator.targets import Target +from orchestrator.workflow import StepList, done, init, step, workflow +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic_forms.types import State, UUIDstr + +from gso.products import ProductName +from gso.products.product_types.r_and_e_lhcone import ImportedRAndELHCOne, RAndELHCOne +from gso.services.partners import get_partner_by_id +from gso.services.subscriptions import get_product_id_by_name + + +@step("Create an R&E LHCONE subscription") +def import_r_and_e_lhcone_subscription(subscription_id: UUIDstr) -> State: + """Take an imported subscription, and turn it into an R&E LHCONE subscription.""" + old_l3_core_service = ImportedRAndELHCOne.from_subscription(subscription_id) + new_product_id = get_product_id_by_name(ProductName.R_AND_E_LHCONE) + new_subscription = RAndELHCOne.from_other_product(old_l3_core_service, new_product_id) # type: ignore[arg-type] + new_subscription.description = ( + f"{new_subscription.product.name} service for {get_partner_by_id(new_subscription.customer_id).name}" + ) + return {"subscription": new_subscription} + + +@workflow("Import R&E LHCONE", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None)) +def import_r_and_e_lhcone() -> StepList: + """Modify an imported subscription into an R&E LHCONE subscription to complete the import.""" + return ( + init + >> store_process_subscription(Target.MODIFY) + >> unsync + >> import_r_and_e_lhcone_subscription + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/migrate_r_and_e_lhcone.py b/gso/workflows/l3_core_service/r_and_e_lhcone/migrate_r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..d5d5a611a248cfaada4a9d536f78b41dcc611efa --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/migrate_r_and_e_lhcone.py @@ -0,0 +1,69 @@ +"""A modification workflow that migrates an R&E LHCONE Service to a new Edge Port. + +In one run of a migration workflow, only a single Access Port can be replaced. This is due to the nature of these +services. When a service is dually homed, for example, only one of the Access Ports should be migrated. The other one +will remain the way it is. + +At the start of the workflow, the operator will select one Access Port that is to be migrated, and will then select a +destination Edge Port that this service should be placed on. All other Access Ports will be left as-is. +""" + +from orchestrator import workflow +from orchestrator.targets import Target +from orchestrator.workflow import StepList, begin, conditional, done +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form + +from gso.services.lso_client import lso_interaction +from gso.utils.workflow_steps import IS_HUMAN_INITIATED_WF_KEY, start_moodi, stop_moodi +from gso.workflows.l3_core_service.base_migrate_l3_core_service import ( + deactivate_bgp_dry, + deactivate_bgp_real, + deactivate_sbp_dry, + deactivate_sbp_real, + deploy_bgp_session_dry, + deploy_bgp_session_real, + deploy_destination_ep_dry, + deploy_destination_ep_real, + generate_scoped_subscription_model, + inform_operator_traffic_check, + initial_input_form, + inject_partner_name, + show_bgp_neighbors, + update_dns_records, + update_subscription_model, +) + + +@workflow( + "Migrate R&E LHCONE", + initial_input_form=wrap_modify_initial_input_form(initial_input_form), + target=Target.MODIFY, +) +def migrate_r_and_e_lhcone() -> StepList: + """Migrate an R&E LHCONE Service to a destination Edge Port.""" + is_human_initiated_wf = conditional(lambda state: bool(state.get(IS_HUMAN_INITIATED_WF_KEY))) + + return ( + begin + >> store_process_subscription(Target.MODIFY) + >> inject_partner_name + >> is_human_initiated_wf(lso_interaction(show_bgp_neighbors)) + >> is_human_initiated_wf(lso_interaction(deactivate_bgp_dry)) + >> is_human_initiated_wf(lso_interaction(deactivate_bgp_real)) + >> is_human_initiated_wf(lso_interaction(deactivate_sbp_dry)) + >> is_human_initiated_wf(lso_interaction(deactivate_sbp_real)) + >> is_human_initiated_wf(inform_operator_traffic_check) + >> unsync + >> generate_scoped_subscription_model + >> start_moodi() + >> lso_interaction(deploy_destination_ep_dry) + >> lso_interaction(deploy_destination_ep_real) + >> lso_interaction(deploy_bgp_session_dry) + >> lso_interaction(deploy_bgp_session_real) + >> update_dns_records + >> update_subscription_model + >> resync + >> stop_moodi() + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/modify_r_and_e_lhcone.py b/gso/workflows/l3_core_service/r_and_e_lhcone/modify_r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..7182a48b221894630cd96da21d426ad1ea19dceb --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/modify_r_and_e_lhcone.py @@ -0,0 +1,38 @@ +"""Modification workflow for an R&E LHCONE subscription.""" + +from orchestrator import begin, conditional, done, workflow +from orchestrator.targets import Target +from orchestrator.workflow import StepList +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form + +from gso.workflows.l3_core_service.base_modify_l3_core_service import ( + Operation, + create_new_sbp, + initial_input_form_generator, + modify_existing_sbp, + remove_old_sbp, +) + + +@workflow( + "Modify R&E LHCONE", + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), + target=Target.MODIFY, +) +def modify_r_and_e_lhcone() -> StepList: + """Modify R&E LHCONE subscription.""" + access_port_is_added = conditional(lambda state: state["operation"] == Operation.ADD) + access_port_is_removed = conditional(lambda state: state["operation"] == Operation.REMOVE) + access_port_is_modified = conditional(lambda state: state["operation"] == Operation.EDIT) + + return ( + begin + >> store_process_subscription(Target.MODIFY) + >> unsync + >> access_port_is_added(create_new_sbp) + >> access_port_is_removed(remove_old_sbp) + >> access_port_is_modified(modify_existing_sbp) + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/terminate_r_and_e_lhcone.py b/gso/workflows/l3_core_service/r_and_e_lhcone/terminate_r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..5ac0216ce7ee6c5e1a4eadd48cd3dce59b9254bd --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/terminate_r_and_e_lhcone.py @@ -0,0 +1,42 @@ +"""Workflow for terminating an R&E LHCONE subscription.""" + +from orchestrator import begin, workflow +from orchestrator.forms import SubmitFormPage +from orchestrator.targets import Target +from orchestrator.types import SubscriptionLifecycle +from orchestrator.workflow import StepList, done +from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic_forms.types import FormGenerator, UUIDstr + +from gso.products.product_types.r_and_e_lhcone import RAndELHCOne +from gso.utils.types.tt_number import TTNumber + + +def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: + """Initial input form generator for terminating an R&E LHCONE subscription.""" + subscription = RAndELHCOne.from_subscription(subscription_id) + + class TerminateForm(SubmitFormPage): + tt_number: TTNumber + + user_input = yield TerminateForm + + return {"subscription": subscription} | user_input.model_dump() + + +@workflow( + "Terminate R&E LHCONE", + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), + target=Target.TERMINATE, +) +def terminate_r_and_e_lhcone() -> StepList: + """Terminate an R&E LHCONE subscription.""" + return ( + begin + >> store_process_subscription(Target.TERMINATE) + >> unsync + >> set_status(SubscriptionLifecycle.TERMINATED) + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_lhcone/validate_r_and_e_lhcone.py b/gso/workflows/l3_core_service/r_and_e_lhcone/validate_r_and_e_lhcone.py new file mode 100644 index 0000000000000000000000000000000000000000..435188401d93e9b5925e73aa38f4b9ad16885b07 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_lhcone/validate_r_and_e_lhcone.py @@ -0,0 +1,30 @@ +"""Validation workflow for R&E LHCONE subscription objects.""" + +from orchestrator.targets import Target +from orchestrator.workflow import StepList, begin, done, workflow +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form + +from gso.services.lso_client import anonymous_lso_interaction +from gso.workflows.l3_core_service.base_validate_l3_core_service import ( + build_fqdn_list, + validate_bgp_peers, + validate_dns_records, + validate_sbp_config, +) + + +@workflow("Validate R&E LHCONE", target=Target.SYSTEM, initial_input_form=(wrap_modify_initial_input_form(None))) +def validate_r_and_e_lhcone() -> StepList: + """Validate an existing R&E LHCONE subscription.""" + return ( + begin + >> store_process_subscription(Target.SYSTEM) + >> unsync + >> build_fqdn_list + >> anonymous_lso_interaction(validate_sbp_config) + >> anonymous_lso_interaction(validate_bgp_peers) + >> validate_dns_records + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/__init__.py b/gso/workflows/l3_core_service/r_and_e_peer/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3f833e7f1e0df9092e1cc064ced58c948dfa9bb1 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/__init__.py @@ -0,0 +1 @@ +"""R&E Peer service workflows.""" diff --git a/gso/workflows/l3_core_service/r_and_e_peer/create_imported_r_and_e_peer.py b/gso/workflows/l3_core_service/r_and_e_peer/create_imported_r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..6e56e435edefda5fcfd09cab33ad291483093898 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/create_imported_r_and_e_peer.py @@ -0,0 +1,43 @@ +"""A creation workflow for adding an existing Imported R&E Peer to the service database.""" + +from orchestrator import workflow +from orchestrator.targets import Target +from orchestrator.types import SubscriptionLifecycle +from orchestrator.workflow import StepList, begin, done, step +from orchestrator.workflows.steps import resync, set_status, store_process_subscription + +from gso.products import ProductName +from gso.products.product_types.r_and_e_peer import ImportedRAndEPeerInactive +from gso.services.partners import get_partner_by_name +from gso.services.subscriptions import get_product_id_by_name +from gso.workflows.l3_core_service.base_create_imported_l3_core_service import ( + initial_input_form_generator, + initialize_subscription, +) + + +@step("Create subscription") +def create_subscription(partner: str) -> dict: + """Create a new subscription object in the database.""" + partner_id = get_partner_by_name(partner).partner_id + product_id = get_product_id_by_name(ProductName.IMPORTED_R_AND_E_PEER) + subscription = ImportedRAndEPeerInactive.from_product_id(product_id, partner_id) + return {"subscription": subscription, "subscription_id": subscription.subscription_id} + + +@workflow( + "Create Imported R&E Peer", + initial_input_form=initial_input_form_generator, + target=Target.CREATE, +) +def create_imported_r_and_e_peer() -> StepList: + """Import an R&E Peer without provisioning it.""" + return ( + begin + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/create_r_and_e_peer.py b/gso/workflows/l3_core_service/r_and_e_peer/create_r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..93493e9d3749b6b7ff7811914e5d90189f174b5e --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/create_r_and_e_peer.py @@ -0,0 +1,61 @@ +"""Create R&E Peer subscription workflow.""" + +from orchestrator.targets import Target +from orchestrator.types import SubscriptionLifecycle +from orchestrator.workflow import StepList, begin, done, step, workflow +from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from orchestrator.workflows.utils import wrap_create_initial_input_form +from pydantic_forms.types import State, UUIDstr + +from gso.products.product_types.r_and_e_peer import RAndEPeerInactive +from gso.services.lso_client import lso_interaction +from gso.utils.workflow_steps import prompt_sharepoint_checklist_url, start_moodi, stop_moodi +from gso.workflows.l3_core_service.base_create_l3_core_service import ( + check_bgp_peers, + check_sbp_functionality, + create_new_sharepoint_checklist, + deploy_bgp_peers_dry, + deploy_bgp_peers_real, + initial_input_form_generator, + initialize_subscription, + provision_sbp_dry, + provision_sbp_real, + update_dns_records, +) + + +@step("Create subscription") +def create_subscription(product: UUIDstr, partner: str) -> State: + """Create a new subscription object in the database.""" + subscription = RAndEPeerInactive.from_product_id(product, partner) + + return {"subscription": subscription, "subscription_id": subscription.subscription_id} + + +@workflow( + "Create R&E Peer", + initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), + target=Target.CREATE, +) +def create_r_and_e_peer() -> StepList: + """Create a new R&E Peer subscription.""" + return ( + begin + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> start_moodi() + >> lso_interaction(provision_sbp_dry) + >> lso_interaction(provision_sbp_real) + >> lso_interaction(check_sbp_functionality) + >> lso_interaction(deploy_bgp_peers_dry) + >> lso_interaction(deploy_bgp_peers_real) + >> lso_interaction(check_bgp_peers) + >> update_dns_records + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> create_new_sharepoint_checklist + >> prompt_sharepoint_checklist_url + >> stop_moodi() + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/import_r_and_e_peer.py b/gso/workflows/l3_core_service/r_and_e_peer/import_r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..2348d6e9b7166f116da099428339dda3ada9323d --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/import_r_and_e_peer.py @@ -0,0 +1,37 @@ +"""A modification workflow for migrating an `ImportedRAndEPeer` to a `RAndEPeer` subscription.""" + +from orchestrator.targets import Target +from orchestrator.workflow import StepList, done, init, step, workflow +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic_forms.types import State, UUIDstr + +from gso.products import ProductName +from gso.products.product_types.r_and_e_peer import ImportedRAndEPeer, RAndEPeer +from gso.services.partners import get_partner_by_id +from gso.services.subscriptions import get_product_id_by_name + + +@step("Create an R&E Peer subscription") +def import_r_and_e_peer_subscription(subscription_id: UUIDstr) -> State: + """Take an imported subscription, and turn it into an R&E Peer subscription.""" + old_l3_core_service = ImportedRAndEPeer.from_subscription(subscription_id) + new_product_id = get_product_id_by_name(ProductName.R_AND_E_PEER) + new_subscription = RAndEPeer.from_other_product(old_l3_core_service, new_product_id) # type: ignore[arg-type] + new_subscription.description = ( + f"{new_subscription.product.name} service for {get_partner_by_id(new_subscription.customer_id).name}" + ) + return {"subscription": new_subscription} + + +@workflow("Import R&E Peer", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None)) +def import_r_and_e_peer() -> StepList: + """Modify an imported subscription into an R&E Peer subscription to complete the import.""" + return ( + init + >> store_process_subscription(Target.MODIFY) + >> unsync + >> import_r_and_e_peer_subscription + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/migrate_r_and_e_peer.py b/gso/workflows/l3_core_service/r_and_e_peer/migrate_r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..a72a767683e11b1b384acd756aa1609bcbf4ce50 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/migrate_r_and_e_peer.py @@ -0,0 +1,69 @@ +"""A modification workflow that migrates an R&E Peer Service to a new Edge Port. + +In one run of a migration workflow, only a single Access Port can be replaced. This is due to the nature of these +services. When a service is dually homed, for example, only one of the Access Ports should be migrated. The other one +will remain the way it is. + +At the start of the workflow, the operator will select one Access Port that is to be migrated, and will then select a +destination Edge Port that this service should be placed on. All other Access Ports will be left as-is. +""" + +from orchestrator import workflow +from orchestrator.targets import Target +from orchestrator.workflow import StepList, begin, conditional, done +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form + +from gso.services.lso_client import lso_interaction +from gso.utils.workflow_steps import IS_HUMAN_INITIATED_WF_KEY, start_moodi, stop_moodi +from gso.workflows.l3_core_service.base_migrate_l3_core_service import ( + deactivate_bgp_dry, + deactivate_bgp_real, + deactivate_sbp_dry, + deactivate_sbp_real, + deploy_bgp_session_dry, + deploy_bgp_session_real, + deploy_destination_ep_dry, + deploy_destination_ep_real, + generate_scoped_subscription_model, + inform_operator_traffic_check, + initial_input_form, + inject_partner_name, + show_bgp_neighbors, + update_dns_records, + update_subscription_model, +) + + +@workflow( + "Migrate R&E Peer", + initial_input_form=wrap_modify_initial_input_form(initial_input_form), + target=Target.MODIFY, +) +def migrate_r_and_e_peer() -> StepList: + """Migrate an R&E Peer Service to a destination Edge Port.""" + is_human_initiated_wf = conditional(lambda state: bool(state.get(IS_HUMAN_INITIATED_WF_KEY))) + + return ( + begin + >> store_process_subscription(Target.MODIFY) + >> inject_partner_name + >> is_human_initiated_wf(lso_interaction(show_bgp_neighbors)) + >> is_human_initiated_wf(lso_interaction(deactivate_bgp_dry)) + >> is_human_initiated_wf(lso_interaction(deactivate_bgp_real)) + >> is_human_initiated_wf(lso_interaction(deactivate_sbp_dry)) + >> is_human_initiated_wf(lso_interaction(deactivate_sbp_real)) + >> is_human_initiated_wf(inform_operator_traffic_check) + >> unsync + >> generate_scoped_subscription_model + >> start_moodi() + >> lso_interaction(deploy_destination_ep_dry) + >> lso_interaction(deploy_destination_ep_real) + >> lso_interaction(deploy_bgp_session_dry) + >> lso_interaction(deploy_bgp_session_real) + >> update_dns_records + >> update_subscription_model + >> resync + >> stop_moodi() + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/modify_r_and_e_peer.py b/gso/workflows/l3_core_service/r_and_e_peer/modify_r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..f04009dfe04e2e9c7fac2c1c0e766baebecf7886 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/modify_r_and_e_peer.py @@ -0,0 +1,38 @@ +"""Modification workflow for an R&E Peer subscription.""" + +from orchestrator import begin, conditional, done, workflow +from orchestrator.targets import Target +from orchestrator.workflow import StepList +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form + +from gso.workflows.l3_core_service.base_modify_l3_core_service import ( + Operation, + create_new_sbp, + initial_input_form_generator, + modify_existing_sbp, + remove_old_sbp, +) + + +@workflow( + "Modify R&E Peer", + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), + target=Target.MODIFY, +) +def modify_r_and_e_peer() -> StepList: + """Modify R&E Peer subscription.""" + access_port_is_added = conditional(lambda state: state["operation"] == Operation.ADD) + access_port_is_removed = conditional(lambda state: state["operation"] == Operation.REMOVE) + access_port_is_modified = conditional(lambda state: state["operation"] == Operation.EDIT) + + return ( + begin + >> store_process_subscription(Target.MODIFY) + >> unsync + >> access_port_is_added(create_new_sbp) + >> access_port_is_removed(remove_old_sbp) + >> access_port_is_modified(modify_existing_sbp) + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/terminate_r_and_e_peer.py b/gso/workflows/l3_core_service/r_and_e_peer/terminate_r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..f9dc335496791775f27c2b41d6428580e3ace22a --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/terminate_r_and_e_peer.py @@ -0,0 +1,42 @@ +"""Workflow for terminating an R&E Peer subscription.""" + +from orchestrator import begin, workflow +from orchestrator.forms import SubmitFormPage +from orchestrator.targets import Target +from orchestrator.types import SubscriptionLifecycle +from orchestrator.workflow import StepList, done +from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic_forms.types import FormGenerator, UUIDstr + +from gso.products.product_types.r_and_e_peer import RAndEPeer +from gso.utils.types.tt_number import TTNumber + + +def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: + """Initial input form generator for terminating an R&E Peer subscription.""" + subscription = RAndEPeer.from_subscription(subscription_id) + + class TerminateForm(SubmitFormPage): + tt_number: TTNumber + + user_input = yield TerminateForm + + return {"subscription": subscription} | user_input.model_dump() + + +@workflow( + "Terminate R&E Peer", + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), + target=Target.TERMINATE, +) +def terminate_r_and_e_peer() -> StepList: + """Terminate an R&E Peer subscription.""" + return ( + begin + >> store_process_subscription(Target.TERMINATE) + >> unsync + >> set_status(SubscriptionLifecycle.TERMINATED) + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/validate_r_and_e_peer.py b/gso/workflows/l3_core_service/r_and_e_peer/validate_r_and_e_peer.py new file mode 100644 index 0000000000000000000000000000000000000000..08cca1f643d0b912df46fd7b07f0f65ddc0cfb68 --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/validate_r_and_e_peer.py @@ -0,0 +1,30 @@ +"""Validation workflow for R&E Peer subscription objects.""" + +from orchestrator.targets import Target +from orchestrator.workflow import StepList, begin, done, workflow +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form + +from gso.services.lso_client import anonymous_lso_interaction +from gso.workflows.l3_core_service.base_validate_l3_core_service import ( + build_fqdn_list, + validate_bgp_peers, + validate_dns_records, + validate_sbp_config, +) + + +@workflow("Validate R&E Peer", target=Target.SYSTEM, initial_input_form=(wrap_modify_initial_input_form(None))) +def validate_r_and_e_peer() -> StepList: + """Validate an existing R&E Peer subscription.""" + return ( + begin + >> store_process_subscription(Target.SYSTEM) + >> unsync + >> build_fqdn_list + >> anonymous_lso_interaction(validate_sbp_config) + >> anonymous_lso_interaction(validate_bgp_peers) + >> validate_dns_records + >> resync + >> done + ) diff --git a/gso/workflows/l3_core_service/r_and_e_peer/validate_r_and_e_peer_prefix_list.py b/gso/workflows/l3_core_service/r_and_e_peer/validate_r_and_e_peer_prefix_list.py new file mode 100644 index 0000000000000000000000000000000000000000..7d830113a7be51a731dd08b6cfbc225a3be4089a --- /dev/null +++ b/gso/workflows/l3_core_service/r_and_e_peer/validate_r_and_e_peer_prefix_list.py @@ -0,0 +1,141 @@ +"""Prefix Validation workflow for R&E Peer subscription objects.""" + +from typing import Any + +from orchestrator.config.assignee import Assignee +from orchestrator.domain import SubscriptionModel +from orchestrator.forms import SubmitFormPage +from orchestrator.targets import Target +from orchestrator.workflow import StepList, begin, conditional, done, inputstep, step, workflow +from orchestrator.workflows.steps import resync, store_process_subscription, unsync_unchecked +from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic import Field +from pydantic_forms.types import FormGenerator, State, UUIDstr +from pydantic_forms.validators import Label + +from gso.services.lso_client import LSOState, anonymous_lso_interaction, lso_interaction +from gso.services.partners import get_partner_by_id +from gso.utils.shared_enums import Vendor + + +@step("Prepare list of all Access Ports") +def build_fqdn_list(subscription_id: UUIDstr) -> State: + """Build the list of all FQDNs in the access ports of L3 Core Service subscription, excluding Juniper devices.""" + subscription = SubscriptionModel.from_subscription(subscription_id) + ap_list = subscription.l3_core.ap_list # type: ignore[attr-defined] + ap_fqdn_list = [ + ap.sbp.edge_port.node.router_fqdn for ap in ap_list if ap.sbp.edge_port.node.vendor != Vendor.JUNIPER + ] + return {"ap_fqdn_list": ap_fqdn_list, "subscription": subscription, "subscription_was_in_sync": subscription.insync} + + +@step("[DRY RUN] Validate Prefix-Lists") +def validate_prefix_lists_dry(subscription: dict[str, Any], process_id: UUIDstr, ap_fqdn_list: list[str]) -> LSOState: + """Workflow step for running a playbook that validates prefix-lists in dry run mode.""" + extra_vars = { + "subscription": subscription, + "partner_name": get_partner_by_id(subscription["customer_id"]).name, + "dry_run": True, + "verb": "deploy", + "object": "prefix_list", + "is_verification_workflow": "true", + "commit_comment": f"GSO_PROCESS_ID: {process_id} - Validate prefix-lists for {subscription["description"]}", + } + + return { + "playbook_name": "gap_ansible/playbooks/validate_prefix_list.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(ap_fqdn_list)}}, + "extra_vars": extra_vars, + } + + +@step("Evaluate validation of Prefix-Lists") +def evaluate_result_has_diff(callback_result: dict) -> State: + """Evaluate the result of the playbook that validates prefix-lists.""" + return {"callback_result": callback_result, "prefix_list_drift": bool(callback_result["return_code"] != 0)} + + +@inputstep("Await operator confirmation", assignee=Assignee.SYSTEM) +def await_operator() -> FormGenerator: + """Show a form for the operator to start redeploying the prefix list that has drifted.""" + + class AwaitOperatorForm(SubmitFormPage): + info_label_a: Label = Field("A drift has been detected for this prefix list!", exclude=True) + info_label_b: Label = Field("Please continue this workflow to redeploy the drifted prefix list.", exclude=True) + + yield AwaitOperatorForm + + return {} + + +@step("[DRY RUN] Deploy Prefix-Lists") +def deploy_prefix_lists_dry(subscription: dict[str, Any], process_id: UUIDstr, ap_fqdn_list: list[str]) -> LSOState: + """Workflow step for running a playbook that deploys prefix-lists in dry run mode.""" + extra_vars = { + "subscription": subscription, + "partner_name": get_partner_by_id(subscription["customer_id"]).name, + "dry_run": True, + "verb": "deploy", + "object": "prefix_list", + "is_verification_workflow": "false", + "commit_comment": f"GSO_PROCESS_ID: {process_id} - Deploy prefix-lists for {subscription["description"]}", + } + + return { + "playbook_name": "gap_ansible/playbooks/deploy_prefix_list.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(ap_fqdn_list)}}, + "extra_vars": extra_vars, + } + + +@step("[REAL] Deploy Prefix-Lists") +def deploy_prefix_lists_real(subscription: dict[str, Any], process_id: UUIDstr, ap_fqdn_list: list[str]) -> LSOState: + """Workflow step for running a playbook that deploys prefix-lists.""" + extra_vars = { + "subscription": subscription, + "partner_name": get_partner_by_id(subscription["customer_id"]).name, + "dry_run": False, + "verb": "deploy", + "object": "prefix_list", + "is_verification_workflow": "false", + "commit_comment": f"GSO_PROCESS_ID: {process_id} - Deploy prefix-lists for {subscription["description"]}", + } + + return { + "playbook_name": "gap_ansible/playbooks/deploy_prefix_list.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(ap_fqdn_list)}}, + "extra_vars": extra_vars, + } + + +@workflow( + "Validate R&E Peer Prefix-List", target=Target.SYSTEM, initial_input_form=(wrap_modify_initial_input_form(None)) +) +def validate_r_and_e_peer_prefix_list() -> StepList: + """Validate prefix-lists for an existing R&E Peer subscription.""" + fqdn_list_is_empty = conditional(lambda state: state["ap_fqdn_list"] == []) + prefix_list_has_drifted = conditional(lambda state: bool(state["prefix_list_drift"])) + subscription_was_in_sync = conditional(lambda state: bool(state["subscription_was_in_sync"])) + + redeploy_prefix_list_steps = ( + begin + >> unsync_unchecked + >> await_operator + >> lso_interaction(deploy_prefix_lists_dry) + >> lso_interaction(deploy_prefix_lists_real) + >> subscription_was_in_sync(resync) + ) + prefix_list_validation_steps = ( + begin + >> anonymous_lso_interaction(validate_prefix_lists_dry, evaluate_result_has_diff) + >> prefix_list_has_drifted(redeploy_prefix_list_steps) + ) + + return ( + begin + >> store_process_subscription(Target.SYSTEM) + >> build_fqdn_list + >> fqdn_list_is_empty(done) + >> prefix_list_validation_steps + >> done + ) diff --git a/gso/workflows/l3_core_service/shared.py b/gso/workflows/l3_core_service/shared.py index 8370cb2b6e6fcecd122eadd3a15868a5ef78a11c..cf5320d26dc48288bf29108f178c20e22867e697 100644 --- a/gso/workflows/l3_core_service/shared.py +++ b/gso/workflows/l3_core_service/shared.py @@ -7,25 +7,45 @@ from gso.products.product_types.copernicus import Copernicus from gso.products.product_types.geant_ip import GeantIP from gso.products.product_types.ias import IAS from gso.products.product_types.lhcone import LHCOne +from gso.products.product_types.r_and_e_lhcone import RAndELHCOne +from gso.products.product_types.r_and_e_peer import RAndEPeer L3_PRODUCT_NAMES = [ ProductName.GEANT_IP, ProductName.IAS, ProductName.LHCONE, ProductName.COPERNICUS, + ProductName.R_AND_E_PEER, + ProductName.R_AND_E_LHCONE, +] +L3_CORE_SERVICE_PRODUCT_TYPES = [ + GeantIP.__name__, + IAS.__name__, + LHCOne.__name__, + Copernicus.__name__, + RAndEPeer.__name__, + RAndELHCOne.__name__, ] -L3_CORE_SERVICE_PRODUCT_TYPES = [GeantIP.__name__, IAS.__name__, LHCOne.__name__, Copernicus.__name__] assert len(L3_PRODUCT_NAMES) == len( # noqa: S101 L3_CORE_SERVICE_PRODUCT_TYPES ), "The number of L3 product names does not match the number of L3 core service product types." -L3ProductNameType = Literal[ProductName.IAS, ProductName.LHCONE, ProductName.COPERNICUS, ProductName.GEANT_IP] +L3ProductNameType = Literal[ + ProductName.IAS, + ProductName.LHCONE, + ProductName.COPERNICUS, + ProductName.GEANT_IP, + ProductName.R_AND_E_PEER, + ProductName.R_AND_E_LHCONE, +] L3_CREAT_IMPORTED_WF_MAP = { ProductName.COPERNICUS: "create_imported_copernicus", ProductName.GEANT_IP: "create_imported_geant_ip", ProductName.IAS: "create_imported_ias", ProductName.LHCONE: "create_imported_lhcone", + ProductName.R_AND_E_PEER: "create_imported_r_and_e_peer", + ProductName.R_AND_E_LHCONE: "create_imported_r_and_e_lhcone", } L3_CREATION_WF_MAP = { @@ -33,6 +53,8 @@ L3_CREATION_WF_MAP = { ProductName.GEANT_IP: "create_geant_ip", ProductName.IAS: "create_ias", ProductName.LHCONE: "create_lhcone", + ProductName.R_AND_E_PEER: "create_r_and_e_peer", + ProductName.R_AND_E_LHCONE: "create_r_and_e_lhcone", } L3_MIGRATION_WF_MAP = { @@ -40,6 +62,8 @@ L3_MIGRATION_WF_MAP = { ProductName.GEANT_IP: "migrate_geant_ip", ProductName.IAS: "migrate_ias", ProductName.LHCONE: "migrate_lhcone", + ProductName.R_AND_E_PEER: "migrate_r_and_e_peer", + ProductName.R_AND_E_LHCONE: "migrate_r_and_e_lhcone", } @@ -48,6 +72,8 @@ L3_MODIFICATION_WF_MAP = { ProductName.GEANT_IP: "modify_geant_ip", ProductName.IAS: "modify_ias", ProductName.LHCONE: "modify_lhcone", + ProductName.R_AND_E_PEER: "modify_r_and_e_peer", + ProductName.R_AND_E_LHCONE: "modify_r_and_e_lhcone", } @@ -56,6 +82,8 @@ L3_IMPORT_WF_MAP = { ProductName.IMPORTED_GEANT_IP: "import_geant_ip", ProductName.IMPORTED_IAS: "import_ias", ProductName.IMPORTED_LHCONE: "import_lhcone", + ProductName.IMPORTED_R_AND_E_PEER: "import_r_and_e_peer", + ProductName.IMPORTED_R_AND_E_LHCONE: "import_r_and_e_lhcone", } L3_VALIDATION_WF_MAP = { @@ -63,6 +91,8 @@ L3_VALIDATION_WF_MAP = { ProductName.IAS: "validate_ias", ProductName.LHCONE: "validate_lhcone", ProductName.COPERNICUS: "validate_copernicus", + ProductName.R_AND_E_PEER: "validate_r_and_e_peer", + ProductName.R_AND_E_LHCONE: "validate_r_and_e_lhcone", } L3_TERMINATION_WF_MAP = { @@ -70,4 +100,6 @@ L3_TERMINATION_WF_MAP = { ProductName.GEANT_IP: "terminate_geant_ip", ProductName.IAS: "terminate_ias", ProductName.LHCONE: "terminate_lhcone", + ProductName.R_AND_E_PEER: "terminate_r_and_e_peer", + ProductName.R_AND_E_LHCONE: "terminate_r_and_e_lhcone", } diff --git a/gso/workflows/router/redeploy_base_config.py b/gso/workflows/router/redeploy_base_config.py index 11c4264dc754f2e7289241a2324ba7b00dabf2c7..1fba2bc87bad1f166c40b0bdc4f23227169f8f69 100644 --- a/gso/workflows/router/redeploy_base_config.py +++ b/gso/workflows/router/redeploy_base_config.py @@ -11,13 +11,13 @@ run. After confirmation by an operator, the configuration is committed to the ma from orchestrator.forms import SubmitFormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target -from orchestrator.workflow import StepList, begin, done, workflow +from orchestrator.workflow import StepList, begin, conditional, done, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic_forms.types import FormGenerator, UUIDstr from gso.products.product_types.router import Router -from gso.services.lso_client import lso_interaction +from gso.services.lso_client import anonymous_lso_interaction, lso_interaction from gso.utils.types.tt_number import TTNumber from gso.utils.workflow_steps import deploy_base_config_dry, deploy_base_config_real @@ -28,6 +28,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: class RedeployBaseConfigForm(SubmitFormPage): info_label: Label = f"Redeploy base config on {router.router.router_fqdn}?" tt_number: TTNumber + is_massive_redeploy: bool = False user_input = yield RedeployBaseConfigForm @@ -45,12 +46,16 @@ def redeploy_base_config() -> StepList: * Perform a dry run of deployment * Redeploy base config """ + is_not_massive_redeploy = conditional(lambda state: not bool(state.get("is_massive_redeploy"))) + is_massive_redeploy = conditional(lambda state: bool(state.get("is_massive_redeploy"))) + return ( begin >> store_process_subscription(Target.MODIFY) >> unsync - >> lso_interaction(deploy_base_config_dry) - >> lso_interaction(deploy_base_config_real) + >> is_not_massive_redeploy(lso_interaction(deploy_base_config_dry)) + >> is_not_massive_redeploy(lso_interaction(deploy_base_config_real)) + >> is_massive_redeploy(anonymous_lso_interaction(deploy_base_config_real)) >> resync >> done ) diff --git a/gso/workflows/tasks/redeploy_base_config.py b/gso/workflows/tasks/redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..e1b46a79347d8b02b8382cfeb1872a2feaa59e69 --- /dev/null +++ b/gso/workflows/tasks/redeploy_base_config.py @@ -0,0 +1,94 @@ +"""Task for redeploying base config on multiple routers at one. + +This task spawns multiple instances of the ``redeploy_base_config`` workflow, based on a list of Nokia routers given as +input by the operator. The operator can then +""" + +import json +from typing import Annotated + +from annotated_types import Len +from orchestrator.config.assignee import Assignee +from orchestrator.forms import SubmitFormPage +from orchestrator.targets import Target +from orchestrator.workflow import StepList, callback_step, conditional, done, init, inputstep, step, workflow +from pydantic import AfterValidator, ConfigDict +from pydantic_forms.types import FormGenerator, State, UUIDstr +from pydantic_forms.validators import LongText, validate_unique_list + +from gso.products.product_types.router import Router +from gso.services.subscriptions import get_active_router_subscriptions +from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task +from gso.utils.helpers import active_nokia_router_selector +from gso.utils.shared_enums import Vendor +from gso.utils.types.tt_number import TTNumber + + +def _input_form_generator() -> FormGenerator: + router_selection_list = Annotated[ # type: ignore[valid-type] + list[active_nokia_router_selector()], # type: ignore[misc] + AfterValidator(validate_unique_list), + Len(min_length=1), + ] + + all_active_nokia_routers = [ + router["subscription_id"] + for router in get_active_router_subscriptions() + if Router.from_subscription(router["subscription_id"]).router.vendor == Vendor.NOKIA + ] + + class RedeployBaseConfigForm(SubmitFormPage): + model_config = ConfigDict(title="Redeploy base config on multiple routers") + + tt_number: TTNumber + selected_routers: router_selection_list = all_active_nokia_routers # type: ignore[valid-type] + + user_input = yield RedeployBaseConfigForm + return user_input.model_dump() | {"failed_wfs": {}, "successful_wfs": {}} + + +@step("Start worker to redeploy base config on selected routers") +def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr], callback_route: str) -> State: + """Start the massive redeploy base config task with the selected routers.""" + # TODO if in the future you changed UUIDstr to UUID, you need to convert them to string when passing to the task + massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5) # type: ignore[attr-defined] + + return {"failed_wfs": {}, "successful_wfs": {}} + + +@step("Evaluate provisioning proxy result") +def evaluate_results(callback_result: dict) -> State: + """Evaluate the result of the provisioning proxy callback.""" + failed_wfs = callback_result.pop("failed_wfs", {}) + successful_wfs = callback_result.pop("successful_wfs", {}) + return {"callback_result": callback_result, "failed_wfs": failed_wfs, "successful_wfs": successful_wfs} + + +@inputstep("Some workflows have failed", assignee=Assignee.SYSTEM) +def workflows_failed_to_prompt(failed_wfs: dict, successful_wfs: dict) -> FormGenerator: + """Prompt the operator that some workflows have failed to start.""" + + class WFFailurePrompt(SubmitFormPage): + model_config = ConfigDict(title="Some redeploy workflows have failed, please inspect the list below") + failed_workflows: LongText = json.dumps(failed_wfs, indent=4) + successful_workflows: LongText = json.dumps(successful_wfs, indent=4) + + yield WFFailurePrompt + return {} + + +@workflow("Redeploy base config on multiple routers", initial_input_form=_input_form_generator, target=Target.SYSTEM) +def task_redeploy_base_config() -> StepList: + """Gather a list of routers from the operator to redeploy base config onto.""" + some_failed_to_start = conditional(lambda state: len(state.get("failed_wfs", {})) > 0) + + return ( + init + >> callback_step( + name="Start running redeploy workflows on selected routers", + action_step=start_redeploy_workflows, + validate_step=evaluate_results, + ) + >> some_failed_to_start(workflows_failed_to_prompt) + >> done + ) diff --git a/setup.py b/setup.py index c6b178548776bd67429fd9ad7d2246cff2b91479..7074b11a0af2530071866ebc7d889539188b9c71 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import find_packages, setup setup( name="geant-service-orchestrator", - version="3.9", + version="3.10", author="GÉANT Orchestration and Automation Team", author_email="goat@geant.org", description="GÉANT Service Orchestrator", diff --git a/test/conftest.py b/test/conftest.py index 44e2901b6623a1105ca2d5b8f9c0a133fe162e5f..3e4a765d70cfefbd4ae800de2e18d35423ec8d6f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -34,7 +34,6 @@ from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient from urllib3_mock import Responses -import gso.services.mailer from gso.services.partners import PartnerSchema, create_partner from gso.services.subscriptions import is_resource_type_value_unique from test.fixtures import * # noqa: F403 @@ -57,6 +56,8 @@ def pytest_configure(config): # Set environment variables for the test session os.environ["OSS_PARAMS_FILENAME"] = "gso/oss-params-example.json" os.environ["TESTING"] = "true" + os.environ["CELERY_TASK_ALWAYS_EAGER"] = "true" + os.environ["CELERY_TASK_EAGER_PROPAGATES"] = "true" # Register finalizers to clean up after tests are done def cleanup() -> None: @@ -593,11 +594,15 @@ def responses(): def _no_mail(monkeypatch): """Remove sending mails from all tests.""" - def send_mail(subject: str, body: str, *, destination: str | None = None) -> None: + def fake_send_mail(subject: str, body: str, *, destination: str | None = None) -> None: email = f"*** SENT AN EMAIL ***\nTO: {destination}\nSUBJECT: {subject}\nCONTENT:\n{body}" logger.info(email) - monkeypatch.setattr(gso.services.mailer, "send_mail", send_mail) + monkeypatch.setattr( + "gso.services.mailer.send_mail", + fake_send_mail, + raising=True, + ) @pytest.fixture(autouse=True) @@ -605,7 +610,7 @@ def _no_lso_interactions(monkeypatch): """Remove all external LSO calls.""" @step("Mocked playbook execution") - def _execute_playbook( + def fake_execute_playbook( playbook_name: str, callback_route: str, inventory: dict, extra_vars: dict, process_id: UUIDstr ) -> None: assert playbook_name @@ -614,4 +619,8 @@ def _no_lso_interactions(monkeypatch): assert extra_vars assert process_id - monkeypatch.setattr(gso.services.lso_client, "_execute_playbook", _execute_playbook) + monkeypatch.setattr( + "gso.services.lso_client._execute_playbook", + fake_execute_playbook, + raising=True, + ) diff --git a/test/fixtures/__init__.py b/test/fixtures/__init__.py index b2ee468ffd7c698d291e1390dfd79ae19b10f1be..5fa18f0139c571571e8692285e3700a74263f9bd 100644 --- a/test/fixtures/__init__.py +++ b/test/fixtures/__init__.py @@ -11,6 +11,8 @@ from test.fixtures.l3_core_service_fixtures import ( l3_core_service_subscription_factory, lhcone_subscription_factory, make_subscription_factory, + r_and_e_lhcone_subscription_factory, + r_and_e_peer_subscription_factory, save_l3_core_subscription, service_binding_port_factory, ) @@ -42,6 +44,8 @@ __all__ = [ "make_subscription_factory", "office_router_subscription_factory", "opengear_subscription_factory", + "r_and_e_lhcone_subscription_factory", + "r_and_e_peer_subscription_factory", "router_subscription_factory", "save_l3_core_subscription", "service_binding_port_factory", diff --git a/test/fixtures/l3_core_service_fixtures.py b/test/fixtures/l3_core_service_fixtures.py index d23bc42978c325220fa304782f6192841b25df7c..bac0fbc3c310b2b0bd3dbb3228c4fd320dee7104 100644 --- a/test/fixtures/l3_core_service_fixtures.py +++ b/test/fixtures/l3_core_service_fixtures.py @@ -19,6 +19,8 @@ from gso.products.product_types.edge_port import EdgePort from gso.products.product_types.geant_ip import GeantIPInactive, ImportedGeantIPInactive from gso.products.product_types.ias import IASInactive, ImportedIASInactive from gso.products.product_types.lhcone import ImportedLHCOneInactive, LHCOneInactive +from gso.products.product_types.r_and_e_lhcone import ImportedRAndELHCOneInactive, RAndELHCOneInactive +from gso.products.product_types.r_and_e_peer import ImportedRAndEPeerInactive, RAndEPeerInactive from gso.services.subscriptions import get_product_id_by_name from gso.utils.shared_enums import APType, SBPType from gso.utils.types.ip_address import IPAddress @@ -28,6 +30,8 @@ PRODUCT_IMPORTED_MAP = { ProductName.GEANT_IP: ProductName.IMPORTED_GEANT_IP, ProductName.COPERNICUS: ProductName.IMPORTED_COPERNICUS, ProductName.LHCONE: ProductName.IMPORTED_LHCONE, + ProductName.R_AND_E_PEER: ProductName.IMPORTED_R_AND_E_PEER, + ProductName.R_AND_E_LHCONE: ProductName.IMPORTED_R_AND_E_LHCONE, } @@ -288,12 +292,42 @@ def lhcone_subscription_factory(make_subscription_factory): return factory +@pytest.fixture() +def r_and_e_peer_subscription_factory(make_subscription_factory): + def factory(*args, **kwargs): + return make_subscription_factory( + product_name=ProductName.R_AND_E_PEER, + imported_class=ImportedRAndEPeerInactive, + native_class=RAndEPeerInactive, + *args, # noqa: B026 + **kwargs, + ) + + return factory + + +@pytest.fixture() +def r_and_e_lhcone_subscription_factory(make_subscription_factory): + def factory(*args, **kwargs): + return make_subscription_factory( + product_name=ProductName.R_AND_E_LHCONE, + imported_class=ImportedRAndELHCOneInactive, + native_class=RAndELHCOneInactive, + *args, # noqa: B026 + **kwargs, + ) + + return factory + + @pytest.fixture() def l3_core_service_subscription_factory( ias_subscription_factory, geant_ip_subscription_factory, copernicus_subscription_factory, lhcone_subscription_factory, + r_and_e_peer_subscription_factory, + r_and_e_lhcone_subscription_factory, ) -> callable: def factory(product_name: ProductName, *args, **kwargs): if product_name == ProductName.IAS: @@ -305,6 +339,12 @@ def l3_core_service_subscription_factory( if product_name == ProductName.COPERNICUS: return copernicus_subscription_factory(*args, **kwargs) + if product_name == ProductName.R_AND_E_PEER: + return r_and_e_peer_subscription_factory(*args, **kwargs) + + if product_name == ProductName.R_AND_E_LHCONE: + return r_and_e_lhcone_subscription_factory(*args, **kwargs) + return lhcone_subscription_factory(*args, **kwargs) return factory diff --git a/test/services/test_lso_client.py b/test/services/test_lso_client.py index 78af95fc212a205f906ddb8e34d1bf5e5fec112c..7fa33323e66960e1094c41263859e2e0dfb26a2b 100644 --- a/test/services/test_lso_client.py +++ b/test/services/test_lso_client.py @@ -27,4 +27,4 @@ def test_replace_unicode_in_lso_call_success(mock_post, faker): execute_playbook = _execute_playbook.__wrapped__ execute_playbook("playbook.yaml", "/api/callback_route", {}, extra_vars, mocked_uuid) - mock_post.assert_called_once_with("https://localhost:44444/api/playbook", json=expected_parameters, timeout=10) + mock_post.assert_called_once_with("https://localhost:44444/api/playbook/", json=expected_parameters, timeout=10) diff --git a/test/tasks/__init__.py b/test/tasks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/tasks/test_masssive_redeploy_base_config.py b/test/tasks/test_masssive_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..394b2ca21f2d01139f57d49563f317bd26a7b34d --- /dev/null +++ b/test/tasks/test_masssive_redeploy_base_config.py @@ -0,0 +1,193 @@ +import logging +import uuid +from types import SimpleNamespace +from unittest.mock import patch + +from orchestrator.workflow import ProcessStatus +from pydantic import BaseModel, ValidationError +from pydantic_forms.exceptions import FormValidationError +from pydantic_i18n import PydanticI18n + +from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +def test_all_status_branches( + mock_wait_for_workflow_to_stop, mock_start_process, mock_load_oss, mock_post, router_subscription_factory, faker +): + """ + Test: + - Completed → successful_wfs + - Aborted → failed_wfs["Workflow was aborted"] + - Failed+reason → failed_wfs[reason] + - Failed no reason → default message + - Other status → generic formatting + """ + router_ids = [ + router_subscription_factory(router_fqdn=fqdn).subscription_id + for fqdn in [ + "r1.example.com", + "r2.example.com", + "r3.example.com", + "r4.example.com", + "r5.example.com", + ] + ] + + # stub start_process → return a dummy process_id + mock_start_process.side_effect = lambda *args, **kwargs: uuid.UUID # noqa: ARG005 + + # prepare five different ProcessTable-like objects + p1 = SimpleNamespace(last_step="Done", last_status=ProcessStatus.COMPLETED, failed_reason=None) + p2 = SimpleNamespace(last_step="X", last_status=ProcessStatus.ABORTED, failed_reason=None) + p3 = SimpleNamespace(last_step="Y", last_status=ProcessStatus.FAILED, failed_reason="Bad foo") + p4 = SimpleNamespace(last_step="Z", last_status=ProcessStatus.FAILED, failed_reason=None) + p5 = SimpleNamespace(last_step="L", last_status="RUNNING", failed_reason=None) + + mock_wait_for_workflow_to_stop.side_effect = [p1, p2, p3, p4, p5] + + mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://callback.host")) + mock_post.return_value = SimpleNamespace(ok=True) + + # run task + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb") + + expected_payload = { + "successful_wfs": {"r1.example.com": "Done"}, + "failed_wfs": { + "r2.example.com": "Workflow was aborted", + "r3.example.com": "Bad foo", + "r4.example.com": "Workflow failed without a reason", + "r5.example.com": "Workflow status: RUNNING, last step: L", + }, + } + + mock_post.assert_called_once_with("http://callback.host/cb", json=expected_payload, timeout=30) + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +def test_timeout_and_validation_and_unexpected( + mock_wait_for_workflow_to_stop, + mock_start_process, + mock_load_oss, + mock_post, + router_subscription_factory, + faker, + caplog, +): + """ + Test three error branches: + - wait_for_workflow_to_stop → None (timeout) + - start_process raises FormValidationError + - start_process raises generic Exception + """ + # create three routers (their subscription_id is a UUID) + r_timeout = router_subscription_factory(router_fqdn="t1.example.com") + r_validate = router_subscription_factory(router_fqdn="t2.example.com") + r_crash = router_subscription_factory(router_fqdn="t3.example.com") + router_ids = [ + r_timeout.subscription_id, + r_validate.subscription_id, + r_crash.subscription_id, + ] + + # build a real ValidationError via a dummy Pydantic model + class TempModel(BaseModel): + x: int + + try: + TempModel(x="not_an_int") + except ValidationError as ve: + # supply an explicit (empty) translations dict so PydanticI18n initializes + translator = PydanticI18n(source={"en_US": {}}) + validation_exc = FormValidationError("TempModel", ve, translator, locale="en_US") + + # fake start_process: timeout for first, validation_error for second, crash for third + def fake_start(name, user_inputs): + rid = user_inputs[0]["subscription_id"] + if rid == r_validate.subscription_id: + raise validation_exc + if rid == r_crash.subscription_id: + msg = "boom" + raise RuntimeError(msg) + return f"pid-{rid}" + + mock_start_process.side_effect = fake_start + + # always timeout (None) for the first router + mock_wait_for_workflow_to_stop.return_value = None + + # stub OSS params and successful HTTP callback + mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="https://host")) + mock_post.return_value = SimpleNamespace(ok=True) + + caplog.set_level(logging.ERROR) + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/done") + + expected_failed = { + "t1.example.com": "Timed out waiting for workflow to complete", + "t2.example.com": f"Validation error: {validation_exc}", + "t3.example.com": "Unexpected error: boom", + } + expected_payload = {"successful_wfs": {}, "failed_wfs": expected_failed} + + mock_post.assert_called_once_with( + "https://host/done", + json=expected_payload, + timeout=30, + ) + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +@patch("gso.tasks.massive_redeploy_base_config.Router.from_subscription") +def test_callback_failure_and_exception( + mock_from_subscription, + mock_wait_for_workflow_to_stop, + mock_start_process, + mock_load_oss_params, + mock_requests_post, + caplog, + router_subscription_factory, + faker, +): + """ + Test that when the HTTP callback either returns ok=False or raises, we log.exception. + """ + # Arrange: one router subscription + subscription = router_subscription_factory(router_fqdn="r1.fqdn") + mock_from_subscription.return_value = subscription + router_ids = [subscription.subscription_id] + + # workflow always completes successfully + mock_start_process.return_value = "pid" + mock_wait_for_workflow_to_stop.return_value = SimpleNamespace( + last_step="Done", + last_status=ProcessStatus.COMPLETED, + failed_reason=None, + ) + + # OSS host stub + mock_load_oss_params.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://h")) + + caplog.set_level(logging.ERROR) + + # 1) callback returns ok=False → logs "Callback failed" + mock_requests_post.return_value = SimpleNamespace(ok=False, status_code=500, text="server error") + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1") + assert "Callback failed" in caplog.text + + caplog.clear() + + # 2) callback raises → logs "Failed to post callback: net down" + mock_requests_post.side_effect = Exception("net down") + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1") + assert "Failed to post callback: net down" in caplog.text diff --git a/test/utils/test_helpers.py b/test/utils/test_helpers.py index f68cd05230fee82143cd5a6abb3491d06d457bf6..df18bbc9959c42a395979fa6082ebc80ff3c59fd 100644 --- a/test/utils/test_helpers.py +++ b/test/utils/test_helpers.py @@ -1,6 +1,9 @@ +import logging from unittest.mock import patch +from uuid import uuid4 import pytest +from orchestrator.db import ProcessTable from orchestrator.types import SubscriptionLifecycle from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock @@ -10,6 +13,7 @@ from gso.utils.helpers import ( generate_inventory_for_routers, generate_lan_switch_interconnect_subnet_v4, generate_lan_switch_interconnect_subnet_v6, + wait_for_workflow_to_stop, ) from gso.utils.shared_enums import Vendor from gso.utils.types.tt_number import validate_tt_number @@ -162,3 +166,46 @@ def test_generate_lan_switch_interconnect_subnet_v6(execution_count, site_subscr str(generate_lan_switch_interconnect_subnet_v6(site.site.site_internal_id)) == f"beef:cafe:0:{hex(site.site.site_internal_id).split("x")[-1]}::/64" ) + + +@patch("gso.utils.helpers.time.sleep", lambda _: None) +@patch("gso.utils.helpers.get_stopped_process_by_id") +def test_wait_for_workflow_to_stop_success(mock_get_stopped, caplog): + """Simulate get_stopped_process_by_id returning a process on the 3rd attempt.""" + # Configure the side effect: two Nones, then a process + stopped_proc = ProcessTable(last_status="completed", last_step="Done") + mock_get_stopped.side_effect = [None, None, stopped_proc] + + caplog.set_level(logging.INFO) + pid = uuid4() + + proc = wait_for_workflow_to_stop( + process_id=pid, + check_interval=0, + max_retries=5, + ) + + # Assertions + assert proc is stopped_proc + assert proc.last_status == "completed" + assert mock_get_stopped.call_count == 3 + assert f"✅ Process {pid} has stopped with status: completed" in caplog.text + + +@patch("gso.utils.helpers.time.sleep", lambda _: None) +@patch("gso.utils.helpers.get_stopped_process_by_id", return_value=None) +def test_wait_for_workflow_to_stop_timeout(mock_get_stopped, caplog): + """Simulate get_stopped_process_by_id never finding a stopped process.""" + caplog.set_level(logging.ERROR) + pid = uuid4() + + result = wait_for_workflow_to_stop( + process_id=pid, + check_interval=0, + max_retries=3, + ) + + assert result is None + # max_retries * check_interval = 0 + assert f"❌ Timeout reached. Workflow {pid} did not stop after 0 seconds." in caplog.text + assert mock_get_stopped.call_count == 3 diff --git a/test/workflows/router/test_redeploy_base_config.py b/test/workflows/router/test_redeploy_base_config.py index 4dee5a5e9d18ed93cc08812f85f256db5174688c..66c915b8abea6443cab6c2152c596761b7a18342 100644 --- a/test/workflows/router/test_redeploy_base_config.py +++ b/test/workflows/router/test_redeploy_base_config.py @@ -4,13 +4,16 @@ from gso.products.product_types.router import Router from test.workflows import ( assert_complete, assert_lso_interaction_success, + assert_lso_success, extract_state, run_workflow, ) @pytest.mark.workflow() +@pytest.mark.parametrize("is_massive_redeploy", [False, True]) def test_redeploy_base_config_success( + is_massive_redeploy, router_subscription_factory, faker, ): @@ -18,11 +21,18 @@ def test_redeploy_base_config_success( product_id = str(router_subscription_factory().subscription_id) # Run workflow - initial_input_data = [{"subscription_id": product_id}, {"tt_number": faker.tt_number()}] + initial_input_data = [ + {"subscription_id": product_id}, + {"tt_number": faker.tt_number(), "is_massive_redeploy": is_massive_redeploy}, + ] result, process_stat, step_log = run_workflow("redeploy_base_config", initial_input_data) - for _ in range(2): - result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + if is_massive_redeploy: + for _ in range(1): + result, step_log = assert_lso_success(result, process_stat, step_log) + else: + for _ in range(2): + result, step_log = assert_lso_interaction_success(result, process_stat, step_log) assert_complete(result) diff --git a/test/workflows/tasks/test_redeploy_base_config.py b/test/workflows/tasks/test_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..8c3c25f29693bfbf8349c4ee68c7577d55d7c0ab --- /dev/null +++ b/test/workflows/tasks/test_redeploy_base_config.py @@ -0,0 +1,93 @@ +from unittest.mock import patch + +import pytest + +import test +from test.workflows import ( + assert_awaiting_callback, + assert_complete, + extract_state, + resume_suspended_workflow, + resume_workflow, + run_workflow, +) + + +@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task") +@pytest.mark.workflow() +def test_task_redeploy_base_config_success( + mocked_massive_redeploy_base_config_task, + router_subscription_factory, + faker, +): + selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)] + + # Run workflow task + initial_input_data = [ + {"tt_number": faker.tt_number(), "selected_routers": selected_routers}, + ] + result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data) + + assert_awaiting_callback(result) + result, step_log = resume_workflow( + process_stat, + step_log, + input_data={ + "callback_result": { + "failed_wfs": {}, + "successful_wfs": { + "t4.example.com": "Done", + }, + }, + }, + ) + + assert_complete(result) + + state = extract_state(result) + + assert state["tt_number"] == initial_input_data[0]["tt_number"] + assert state["failed_wfs"] == {} + assert state["successful_wfs"] == { + "t4.example.com": "Done", + } + + +@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task") +@pytest.mark.workflow() +def test_task_redeploy_base_config_failure( + mocked_massive_redeploy_base_config_task, router_subscription_factory, faker +): + selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)] + + # Run workflow task + initial_input_data = [ + {"tt_number": faker.tt_number(), "selected_routers": selected_routers}, + ] + result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data) + + fake_callback_result = { + "callback_result": { + "failed_wfs": { + "t1.example.com": "Timed out waiting for workflow to complete", + "t2.example.com": "Validation error: validation_exc", + "t3.example.com": "Unexpected error: boom", + }, + "successful_wfs": { + "t4.example.com": "Done", + }, + }, + } + assert_awaiting_callback(result) + result, step_log = resume_workflow(process_stat, step_log, input_data=fake_callback_result) + + result, step_log = resume_suspended_workflow( + result, process_stat, step_log, input_data=test.USER_CONFIRM_EMPTY_FORM + ) + + assert_complete(result) + state = extract_state(result) + + assert state["tt_number"] == initial_input_data[0]["tt_number"] + assert state["failed_wfs"] == fake_callback_result["callback_result"]["failed_wfs"] + assert state["successful_wfs"] == fake_callback_result["callback_result"]["successful_wfs"]