Skip to content
Snippets Groups Projects
Commit 57139e29 authored by Neda Moeini's avatar Neda Moeini
Browse files

Added Modify and Terminate site workflow.

parent 52710d9d
No related branches found
No related tags found
1 merge request!98Added Modify and Terminate site workflow.
Pipeline #84395 passed
"""Add Site modification and termination workflow..
Revision ID: 259c320235f5
Revises: 394dc60d5c02
Create Date: 2023-11-02 10:12:09.778614
"""
import sqlalchemy as sa
from alembic import op
from orchestrator.migrations.helpers import create_workflow, delete_workflow
# revision identifiers, used by Alembic.
revision = "259c320235f5"
down_revision = "394dc60d5c02"
branch_labels = None
depends_on = None
new_workflows = [
{"name": "modify_site", "target": "MODIFY", "description": "Modify site", "product_type": "Site"},
{"name": "terminate_site", "target": "TERMINATE", "description": "Terminate site", "product_type": "Site"},
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
......@@ -87,22 +87,25 @@ def get_product_id_by_name(product_name: ProductType) -> UUID:
return ProductTable.query.filter_by(name=product_name).first().product_id
def get_active_site_subscription_by_name(site_name: str) -> Subscription:
"""Retrieve an active subscription for a site by the site's name.
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[Subscription]:
"""Retrieve a list of active subscriptions based on a specified field and its value.
:param site_name: The name of the site for which to retrieve the subscription.
:type site_name: str
:param field_name: The name of the field to filter by.
:type field_name: str
:return: The Subscription object for the site.
:rtype: Subscription
:param field_value: The value of the field to match against.
:type field_value: Any
:return: A list of active Subscription objects that match the criteria.
:rtype: List[Subscription]
"""
return (
SubscriptionTable.query.join(ProductTable)
.join(SubscriptionInstanceTable)
.join(SubscriptionInstanceValueTable)
.join(ResourceTypeTable)
.filter(SubscriptionInstanceValueTable.value == site_name)
.filter(ResourceTypeTable.resource_type == "site_name")
.filter(SubscriptionInstanceValueTable.value == field_value)
.filter(ResourceTypeTable.resource_type == field_name)
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
.first()
.all()
)
import ipaddress
import re
from ipaddress import IPv4Address
from uuid import UUID
import pycountry
from orchestrator import step
from orchestrator.types import State, UUIDstr
from pydantic import BaseModel
......@@ -13,6 +15,7 @@ from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router
from gso.services import provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value
class LAGMember(BaseModel):
......@@ -151,3 +154,28 @@ def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMe
if len(interface_names) != len(set(interface_names)):
raise ValueError("Interfaces must be unique.")
return interfaces
def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int:
"""Validate that a site field is unique."""
if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
raise ValueError(f"{field_name} must be unique")
return value
def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
return value
except ValueError:
raise ValueError("Enter a valid IPv4 or IPv6 address.")
def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid."""
try:
pycountry.countries.lookup(country_code)
return country_code
except LookupError:
raise ValueError("Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format.")
......@@ -9,6 +9,8 @@ LazyWorkflowInstance("gso.workflows.iptrunk.terminate_iptrunk", "terminate_iptru
LazyWorkflowInstance("gso.workflows.router.create_router", "create_router")
LazyWorkflowInstance("gso.workflows.router.terminate_router", "terminate_router")
LazyWorkflowInstance("gso.workflows.site.create_site", "create_site")
LazyWorkflowInstance("gso.workflows.site.modify_site", "modify_site")
LazyWorkflowInstance("gso.workflows.site.terminate_site", "terminate_site")
LazyWorkflowInstance("gso.workflows.tasks.import_site", "import_site")
LazyWorkflowInstance("gso.workflows.tasks.import_router", "import_router")
LazyWorkflowInstance("gso.workflows.tasks.import_iptrunk", "import_iptrunk")
import ipaddress
from typing import NoReturn
import pycountry
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
......@@ -9,11 +5,13 @@ from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
from pydantic.fields import ModelField
from gso.products.product_blocks import site as site_pb
from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate
from gso.products.product_types import site
from gso.services.crm import customer_selector
from gso.utils.helpers import validate_country_code, validate_ipv4_or_ipv6, validate_site_fields_is_unique
def initial_input_form_generator(product_name: str) -> FormGenerator: # noqa: C901
......@@ -33,21 +31,20 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: # noqa: C
site_tier: site_pb.SiteTier
site_ts_address: str
@validator("site_ts_address", allow_reuse=True)
def validate_ts_address(cls, site_ts_address: str) -> str:
validate_site_fields_is_unique("site_ts_address", site_ts_address)
validate_ipv4_or_ipv6(site_ts_address)
return site_ts_address
@validator("site_country_code", allow_reuse=True)
def country_code_must_exist(cls, country_code: str) -> str | NoReturn:
try:
_ = pycountry.countries.lookup(country_code)
return country_code
except LookupError:
raise ValueError("Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format.")
def country_code_must_exist(cls, country_code: str) -> str:
validate_country_code(country_code)
return country_code
@validator("site_ts_address", allow_reuse=True)
def ts_address_must_be_valid(cls, ts_address: str) -> str | NoReturn:
try:
ipaddress.ip_address(ts_address)
return ts_address
except ValueError:
raise ValueError("Enter a valid IPv4 or v6 address.")
@validator("site_name", "site_internal_id", "site_bgp_community_id", allow_reuse=True)
def validate_unique_fields(cls, value: str, field: ModelField) -> str | int:
return validate_site_fields_is_unique(field.name, value)
user_input = yield CreateSiteForm
......
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import validator
from pydantic.fields import ModelField
from pydantic_forms.core import ReadOnlyField
from gso.products.product_blocks import site as site_pb
from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate
from gso.products.product_types.site import Site
from gso.utils.helpers import validate_ipv4_or_ipv6, validate_site_fields_is_unique
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
subscription = Site.from_subscription(subscription_id)
class ModifySiteForm(FormPage):
class Config:
title = "Modify Site"
site_name: str = ReadOnlyField(subscription.site.site_name)
site_city: str = subscription.site.site_city
site_country: str = ReadOnlyField(subscription.site.site_country)
site_country_code: str = ReadOnlyField(subscription.site.site_country_code)
site_latitude: LatitudeCoordinate = subscription.site.site_latitude
site_longitude: LongitudeCoordinate = subscription.site.site_longitude
site_bgp_community_id: int = subscription.site.site_bgp_community_id
site_internal_id: int = subscription.site.site_internal_id
site_tier: site_pb.SiteTier = ReadOnlyField(subscription.site.site_tier)
site_ts_address: str | None = subscription.site.site_ts_address
@validator("site_ts_address", allow_reuse=True)
def validate_ts_address(cls, site_ts_address: str) -> str:
if site_ts_address and site_ts_address != subscription.site.site_ts_address:
validate_site_fields_is_unique("site_ts_address", site_ts_address)
validate_ipv4_or_ipv6(site_ts_address)
return site_ts_address
@validator("site_internal_id", "site_bgp_community_id", allow_reuse=True)
def validate_unique_fields(cls, value: str, field: ModelField) -> str | int:
if value == getattr(subscription.site, field.name):
return value
return validate_site_fields_is_unique(field.name, value)
user_input = yield ModifySiteForm
return user_input.dict()
@step("Modify subscription")
def modify_site_subscription(
subscription: Site,
site_city: str,
site_latitude: LatitudeCoordinate,
site_longitude: LongitudeCoordinate,
site_bgp_community_id: int,
site_internal_id: int,
site_ts_address: str,
) -> State:
subscription.site.site_city = site_city
subscription.site.site_latitude = site_latitude
subscription.site.site_longitude = site_longitude
subscription.site.site_bgp_community_id = site_bgp_community_id
subscription.site.site_internal_id = site_internal_id
subscription.site.site_ts_address = site_ts_address
subscription.description = f"Site in {site_city}, {subscription.site.site_country}"
return {"subscription": subscription}
@workflow(
"Modify Site",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def modify_site() -> StepList:
return (
init
>> store_process_subscription(Target.MODIFY)
>> unsync
>> modify_site_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, done, init, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.site import Site
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
Site.from_subscription(subscription_id)
class TerminateForm(FormPage):
termination_label: Label = "Are you sure you want to delete this site?" # type: ignore[assignment]
user_input = yield TerminateForm
return user_input.dict()
@workflow(
"Terminate Site",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE,
)
def terminate_site() -> StepList:
return (
init
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)
......@@ -28,7 +28,7 @@ def _generate_routers() -> dict[str, str]:
def initial_input_form_generator() -> FormGenerator:
routers = _generate_routers()
RouterEnum = Choice("Select a router", zip(routers.keys(), routers.items())) # type: ignore[arg-type]
router_enum = Choice("Select a router", zip(routers.keys(), routers.items())) # type: ignore[arg-type]
class CreateIptrunkForm(FormPage):
class Config:
......@@ -41,12 +41,12 @@ def initial_input_form_generator() -> FormGenerator:
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
side_a_node_id: RouterEnum # type: ignore[valid-type]
side_a_node_id: router_enum # type: ignore[valid-type]
side_a_ae_iface: str
side_a_ae_geant_a_sid: str
side_a_ae_members: UniqueConstrainedList[LAGMember]
side_b_node_id: RouterEnum # type: ignore[valid-type]
side_b_node_id: router_enum # type: ignore[valid-type]
side_b_ae_iface: str
side_b_ae_geant_a_sid: str
side_b_ae_members: UniqueConstrainedList[LAGMember]
......
......@@ -25,7 +25,7 @@ def _get_site_by_name(site_name: str) -> Site:
----
site_name (str): The name of the site.
"""
subscription = subscriptions.get_active_site_subscription_by_name(site_name)
subscription = subscriptions.get_active_subscriptions_by_field_and_value("site_name", site_name)[0]
if not subscription:
raise ValueError(f"Site with name {site_name} not found.")
......
......@@ -126,7 +126,7 @@ def configuration_data() -> dict:
}
@pytest.fixture(scope="session")
@pytest.fixture(scope="session", autouse=True)
def data_config_filename(configuration_data) -> str:
file_name = os.path.join(tempfile.gettempdir(), os.urandom(24).hex())
open(file_name, "x").close()
......
import pytest
from pydantic_forms.exceptions import FormValidationError
from gso.products.product_types.site import Site
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow
def test_modify_site(responses, site_subscription_factory):
subscription_id = site_subscription_factory()
initial_site_data = [
{"subscription_id": subscription_id},
{
"site_bgp_community_id": 10,
"site_internal_id": 20,
"site_ts_address": "127.0.0.1",
},
]
result, process, step_log = run_workflow("modify_site", initial_site_data)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Site.from_subscription(subscription_id)
assert "active" == subscription.status
assert subscription.site.site_bgp_community_id == initial_site_data[1]["site_bgp_community_id"]
assert subscription.site.site_internal_id == initial_site_data[1]["site_internal_id"]
@pytest.mark.workflow
def test_modify_site_with_invalid_data(responses, site_subscription_factory):
subscription_a = Site.from_subscription(site_subscription_factory())
subscription_b = Site.from_subscription(site_subscription_factory())
initial_site_data = [
{"subscription_id": subscription_b.subscription_id},
{
"site_bgp_community_id": subscription_a.site.site_bgp_community_id,
},
]
with pytest.raises(FormValidationError) as e:
run_workflow("modify_site", initial_site_data)
assert "site_bgp_community_id must be unique" in str(e.value)
import pytest
from gso.products.product_types.site import Site
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow
def test_terminate_site(responses, site_subscription_factory):
subscription_id = site_subscription_factory()
initial_site_data = [{"subscription_id": subscription_id}, {}]
result, process, step_log = run_workflow("terminate_site", initial_site_data)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Site.from_subscription(subscription_id)
assert "terminated" == subscription.status
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment