Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 1048-service-config-backfilling
  • NAT-1154-import-edge-port-update
  • authorship-fix-from-develop
  • develop
  • feature/10GGBS-NAT-980
  • feature/NAT-1260-refactor-precheck-command
  • feature/NAT-1582-moify-prefix-limit-wf
  • feature/NAT-732-ias-to-re-interconnect
  • feature/add-modify-note
  • feature/add-moodi-wf-to-router
  • feature/add-prefix-product
  • feature/nat-1211-edgeport-lacp-xmit
  • feature/nat-1493-edge-port-redeploy
  • fix/NAT-1009/fix-redeploy-base-config-if-there-is-a-vprn
  • fix/l3-imports
  • fix/nat-1120-sdp-validation
  • master
  • release/4.6
  • release/4.7
  • solr-authorship-fix
  • 0.1
  • 0.2
  • 0.3
  • 0.4
  • 0.5
  • 0.6
  • 0.7
  • 0.8
  • 0.9
  • 1.0
  • 1.1
  • 1.4
  • 1.5
  • 2.0
  • 2.1
  • 2.10
  • 2.11
  • 2.12
  • 2.13
  • 2.14
  • 2.15
  • 2.16
  • 2.17
  • 2.18
  • 2.19
  • 2.2
  • 2.20
  • 2.21
  • 2.22
  • 2.23
  • 2.24
  • 2.25
  • 2.26
  • 2.27
  • 2.28
  • 2.29
  • 2.3
  • 2.31
  • 2.32
  • 2.33
  • 2.34
  • 2.35
  • 2.36
  • 2.37
  • 2.38
  • 2.39
  • 2.4
  • 2.40
  • 2.41
  • 2.42
  • 2.43
  • 2.44
  • 2.45
  • 2.46
  • 2.47
  • 2.48
  • 2.5
  • 2.6
  • 2.7
  • 2.8
  • 2.9
  • 3.0
  • 3.1
  • 3.10
  • 3.11
  • 3.12
  • 3.2
  • 3.3
  • 3.4
  • 3.5
  • 3.6
  • 3.7
  • 3.8
  • 3.9
  • 4.0
  • 4.1
  • 4.2
  • 4.3
  • 4.4
  • 4.5
  • 4.8
  • Lime-Seal
102 results

Target

Select target project
No results found
Select Git revision
  • 1048-service-config-backfilling
  • NAT-1154-import-edge-port-update
  • authorship-fix-from-develop
  • develop
  • feature/10GGBS-NAT-980
  • feature/NAT-1260-refactor-precheck-command
  • feature/NAT-1582-moify-prefix-limit-wf
  • feature/NAT-732-ias-to-re-interconnect
  • feature/add-modify-note
  • feature/add-moodi-wf-to-router
  • feature/add-prefix-product
  • feature/nat-1211-edgeport-lacp-xmit
  • feature/nat-1493-edge-port-redeploy
  • fix/NAT-1009/fix-redeploy-base-config-if-there-is-a-vprn
  • fix/l3-imports
  • fix/nat-1120-sdp-validation
  • master
  • release/4.6
  • release/4.7
  • solr-authorship-fix
  • 0.1
  • 0.2
  • 0.3
  • 0.4
  • 0.5
  • 0.6
  • 0.7
  • 0.8
  • 0.9
  • 1.0
  • 1.1
  • 1.4
  • 1.5
  • 2.0
  • 2.1
  • 2.10
  • 2.11
  • 2.12
  • 2.13
  • 2.14
  • 2.15
  • 2.16
  • 2.17
  • 2.18
  • 2.19
  • 2.2
  • 2.20
  • 2.21
  • 2.22
  • 2.23
  • 2.24
  • 2.25
  • 2.26
  • 2.27
  • 2.28
  • 2.29
  • 2.3
  • 2.31
  • 2.32
  • 2.33
  • 2.34
  • 2.35
  • 2.36
  • 2.37
  • 2.38
  • 2.39
  • 2.4
  • 2.40
  • 2.41
  • 2.42
  • 2.43
  • 2.44
  • 2.45
  • 2.46
  • 2.47
  • 2.48
  • 2.5
  • 2.6
  • 2.7
  • 2.8
  • 2.9
  • 3.0
  • 3.1
  • 3.10
  • 3.11
  • 3.12
  • 3.2
  • 3.3
  • 3.4
  • 3.5
  • 3.6
  • 3.7
  • 3.8
  • 3.9
  • 4.0
  • 4.1
  • 4.2
  • 4.3
  • 4.4
  • 4.5
  • 4.8
  • Lime-Seal
102 results
Show changes

Commits on Source 50

115 files
+ 4996
734
Compare changes
  • Side-by-side
  • Inline

Files

+1 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ set -o errexit
set -o nounset

export OSS_PARAMS_FILENAME=../gso/oss-params-example.json
export TESTING=true

pip install sphinx_rtd_theme sphinxcontrib-jquery

Original line number Diff line number Diff line
@@ -9,6 +9,9 @@ Glossary of terms
  API
    Application Programming Interface

  BFD
    Bi-directional Forwarding Detection

  BGP
    Border Gateway Protocol: a path vector routing protocol described in
    `RFC 4271 <https://datatracker.ietf.org/doc/html/rfc4271>`_.
@@ -54,18 +57,27 @@ Glossary of terms
  JSON
    JavaScript Object Notation

  LACP
    Link Aggregation Control Protocol

  LAG
    Link Aggregation: a bundle of multiple network connections.

  LAN
    Local Area Network

  LLDP
    Link Layer Discovery Protocol

  LSO
    Lightweight Service Orchestrator

  NET
    Network Entity Title: used for :term:`ISIS` routing.

  NREN
    National Research and Education Network

  OIDC
    OpenID Connect

@@ -78,6 +90,14 @@ Glossary of terms
  OSS
    Operational Support Systems

  SBP
    Service Binding Point, a logical construct used in the orchestrator to attach a partner subscription to a physical
    (set of) ports.

  SDP
    Service Demarcation Point: A logical construct used for modeling partner subscriptions. It models the link between
    the physical and the service domains.

  SNMP
    Simple Network Management Protocol: a protocol that's used for gathering data, widely used for network management
    and monitoring.
@@ -91,5 +111,8 @@ Glossary of terms
  VLAN
    Virtual LAN

  WAN
    Wide Area Network

  WFO
    `Workflow Orchestrator <https://workfloworchestrator.org/>`_
Original line number Diff line number Diff line
``gso.products.product_blocks.bgp_session``
===========================================

.. automodule:: gso.products.product_blocks.bgp_session
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.products.product_blocks.edge_port``
=========================================

.. automodule:: gso.products.product_blocks.edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.products.product_blocks.geant_ip``
========================================

.. automodule:: gso.products.product_blocks.geant_ip
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
@@ -14,13 +14,16 @@ Submodules
.. toctree::
   :maxdepth: 1

   super_pop_switch
   office_router
   bgp_session
   edge_port
   geant_ip
   iptrunk
   lan_switch_interconnect
   office_router
   opengear
   pop_vlan
   router
   service_binding_port
   site
   super_pop_switch
   switch
   lan_switch_interconnect
   pop_vlan
   opengear
Original line number Diff line number Diff line
``gso.products.product_blocks.service_binding_port``
====================================================

.. automodule:: gso.products.product_blocks.service_binding_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.products.product_types.edge_port``
========================================

.. automodule:: gso.products.product_types.edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.products.product_types.geant_ip``
=======================================

.. automodule:: gso.products.product_types.geant_ip
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
@@ -14,12 +14,14 @@ Submodules
.. toctree::
   :maxdepth: 1

   super_pop_switch
   office_router
   edge_port
   geant_ip
   iptrunk
   lan_switch_interconnect
   office_router
   opengear
   pop_vlan
   router
   site
   super_pop_switch
   switch
   lan_switch_interconnect
   pop_vlan
   opengear
Original line number Diff line number Diff line
@@ -5,6 +5,14 @@
   :members:
   :show-inheritance:

Subpackages
-----------

.. toctree::
   :maxdepth: 1

   types/index

Submodules
----------

Original line number Diff line number Diff line
``gso.utils.types.base_site``
=============================

.. automodule:: gso.utils.types.base_site
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.utils.types.coordinates``
===============================

.. automodule:: gso.utils.types.coordinates
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.utils.types.country_code``
===============================

.. automodule:: gso.utils.types.country_code
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
@@ -4,3 +4,21 @@
.. automodule:: gso.utils.types
   :members:
   :show-inheritance:


Submodules
----------

.. toctree::
   :maxdepth: 2
   :titlesonly:

   base_site
   coordinates
   country_code
   interfaces
   ip_address
   netbox_router
   site_name
   tt_number
   unique_field
Original line number Diff line number Diff line
``gso.utils.types.interfaces``
==============================

.. automodule:: gso.utils.types.interfaces
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.utils.types.ip_address``
==============================

.. automodule:: gso.utils.types.ip_address
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.utils.types.netbox_router``
=================================

.. automodule:: gso.utils.types.netbox_router
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.utils.types.site_name``
=============================

.. automodule:: gso.utils.types.site_name
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.utils.types.tt_number``
=============================

.. automodule:: gso.utils.types.tt_number
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.utils.types.unique_field``
================================

.. automodule:: gso.utils.types.unique_field
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.edge_port.create_edge_port``
============================================

.. automodule:: gso.workflows.edge_port.create_edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.edge_port.create_imported_edge_port``
=====================================================

.. automodule:: gso.workflows.edge_port.create_imported_edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.edge_port.import_edge_port``
============================================

.. automodule:: gso.workflows.edge_port.import_edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.edge_port``
===========================

.. automodule:: gso.workflows.edge_port
   :members:
   :show-inheritance:

Submodules
----------

.. toctree::
   :maxdepth: 2
   :titlesonly:

   create_edge_port
   modify_edge_port
   terminate_edge_port
   validate_edge_port
   import_edge_port
   create_imported_edge_port
Original line number Diff line number Diff line
``gso.workflows.edge_port.modify_edge_port``
============================================

.. automodule:: gso.workflows.edge_port.modify_edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.edge_port.terminate_edge_port``
===============================================

.. automodule:: gso.workflows.edge_port.terminate_edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.edge_port.validate_edge_port``
==============================================

.. automodule:: gso.workflows.edge_port.validate_edge_port
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.geant_ip.create_geant_ip``
==========================================

.. automodule:: gso.workflows.geant_ip.create_geant_ip
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.geant_ip.create_imported_geant_ip``
====================================================

.. automodule:: gso.workflows.geant_ip.create_imported_geant_ip
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.geant_ip.import_geant_ip``
==========================================

.. automodule:: gso.workflows.geant_ip.import_geant_ip
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.geant_ip``
==========================

.. automodule:: gso.workflows.geant_ip
   :members:
   :show-inheritance:

Submodules
----------

.. toctree::
   :maxdepth: 2
   :titlesonly:

   create_geant_ip
   create_imported_geant_ip
   import_geant_ip
   migrate_geant_ip
   modify_geant_ip
Original line number Diff line number Diff line
``gso.workflows.geant_ip.migrate_geant_ip``
===========================================

.. automodule:: gso.workflows.geant_ip.migrate_geant_ip
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
``gso.workflows.geant_ip.modify_geant_ip``
==========================================

.. automodule:: gso.workflows.geant_ip.modify_geant_ip
   :members:
   :show-inheritance:
Original line number Diff line number Diff line
@@ -19,3 +19,5 @@ Subpackages
   site/index
   super_pop_switch/index
   tasks/index
   edge_port/index
   geant_ip/index
 No newline at end of file
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ custom.Contractions = YES
; Using a "regular" - instead of an en dash is totally fine
Microsoft.Negative = NO
Microsoft.RangeFormat = NO
Microsoft.We = suggestion

TokenIgnores = (:term:`\S+`), (:param \S+(?: \S+)?:), (:type \S+:), (:return \S+:), (:rtype: \S+), (:class:`\S+`)

Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ swap:
  can't: cannot
  couldn't: could not
  didn't: did not
  don't: do not
  doesn't: does not
  hasn't: has not
  haven't: have not
+166 −3
Original line number Diff line number Diff line
@@ -18,8 +18,11 @@ from sqlalchemy.exc import SQLAlchemyError

from gso.db.models import PartnerTable
from gso.products import ProductType
from gso.products.product_blocks.bgp_session import IPFamily
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.service_binding_port import VLAN_ID
from gso.services.partners import (
    PartnerEmail,
    PartnerName,
@@ -31,10 +34,10 @@ from gso.services.subscriptions import (
    get_active_subscriptions_by_field_and_value,
    get_subscriptions,
)
from gso.utils.shared_enums import Vendor
from gso.utils.shared_enums import SBPType, Vendor
from gso.utils.types.base_site import BaseSiteValidatorModel
from gso.utils.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType, IPv6AddressType, PortNumber
from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPv6AddressType, PortNumber

app: typer.Typer = typer.Typer()

@@ -161,6 +164,95 @@ class OpenGearImportModel(BaseModel):
    opengear_wan_gateway: IPv4AddressType


class EdgePortImportModel(BaseModel):
    """Required fields for importing an existing :class:`gso.products.product_types.edge_port`."""

    node: str
    service_type: EdgePortType
    speed: PhysicalPortCapacity
    encapsulation: EncapsulationType
    name: str
    minimum_links: int
    geant_ga_id: str | None
    mac_address: str | None
    partner: str
    enable_lacp: bool
    ignore_if_down: bool
    ae_members: LAGMemberList[LAGMember]
    description: str | None = None

    @field_validator("partner")
    def check_if_partner_exists(cls, value: str) -> str:
        """Validate that the partner exists."""
        try:
            get_partner_by_name(value)
        except PartnerNotFoundError as e:
            msg = f"Partner {value} not found"
            raise ValueError(msg) from e

        return value

    @field_validator("node")
    def validate_node(cls, value: str) -> str:
        """Check if the node is an active PE router in :term:`GSO`."""
        pe_routers = {
            str(router.subscription_id)
            for router in get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
        }
        if value not in pe_routers:
            msg = f"Router {value} not found"
            raise ValueError(msg)

        return value

    @model_validator(mode="after")
    def check_members(self) -> Self:
        """Amount of :term:`LAG` members has to match and meet the minimum requirement."""
        if len(self.ae_members) < self.minimum_links:
            msg = f"Number of members should be at least {self.minimum_links} (edge_port_minimum_links)"
            raise ValueError(msg)
        return self


class GeantIPImportModel(BaseModel):
    """Import GEANT IP model."""

    partner: str
    service_binding_ports: list["GeantIPImportModel.ServiceBindingPort"]

    class BaseBGPPeer(BaseModel):
        """Base BGP Peer model."""

        bfd_enabled: bool = False
        bfd_interval: int | None = None
        bfd_multiplier: int | None = None
        has_custom_policies: bool = False
        authentication_key: str
        multipath_enabled: bool = False
        send_default_route: bool = False
        is_passive: bool = False
        peer_address: IPAddress
        families: list[IPFamily]
        is_multi_hop: bool
        rtbh_enabled: bool  # whether Remote Triggered Blackhole is enabled

    class ServiceBindingPort(BaseModel):
        """Service Binding model."""

        edge_port: str
        ap_type: str
        geant_sid: str
        sbp_type: SBPType = SBPType.L3
        is_tagged: bool = False
        vlan_id: VLAN_ID
        custom_firewall_filters: bool = False
        ipv4_address: IPv4AddressType
        ipv6_address: IPv6AddressType
        rtbh_enabled: bool = True
        is_multi_hop: bool = True
        bgp_peers: list["GeantIPImportModel.BaseBGPPeer"]


T = TypeVar(
    "T",
    SiteImportModel,
@@ -169,6 +261,8 @@ T = TypeVar(
    SuperPopSwitchImportModel,
    OfficeRouterImportModel,
    OpenGearImportModel,
    EdgePortImportModel,
    GeantIPImportModel,
)

common_filepath_option = typer.Option(
@@ -219,7 +313,7 @@ def _generic_import_product(
    successfully_imported_data = []
    data = _read_data(file_path)
    for details in data:
        details["partner"] = "GEANT"
        details["partner"] = details.get("partner", "GEANT")
        typer.echo(f"Creating imported {name_key}: {details[name_key]}")
        try:
            initial_data = import_model(**details)
@@ -297,6 +391,38 @@ def import_opengear(filepath: str = common_filepath_option) -> None:
    )


@app.command()
def import_edge_port(filepath: str = common_filepath_option) -> None:
    """Import Edge Port into GSO."""
    successfully_imported_data = []
    data = _read_data(Path(filepath))
    for edge_port in data:
        typer.echo(f"Importing Edge Port {edge_port["name"]} on {edge_port["node"]}. ")
        try:
            edge_port["node"] = _get_router_subscription_id(edge_port["node"])
            initial_data = EdgePortImportModel(**edge_port)
            start_process("create_imported_edge_port", [initial_data.model_dump()])
            successfully_imported_data.append(edge_port["name"])
            typer.echo(f"Successfully imported Edge Port {edge_port["name"]} on {edge_port["node"]}.")
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")

    typer.echo("Waiting for the dust to settle before moving on the importing new products...")
    time.sleep(1)

    edge_port_ids = get_subscriptions(
        [ProductType.IMPORTED_EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
    )
    for subscription_id in edge_port_ids:
        typer.echo(f"Migrating Edge Port {subscription_id}")
        start_process("import_edge_port", [subscription_id])

    if successfully_imported_data:
        typer.echo("Successfully imported Edge Ports:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")


@app.command()
def import_iptrunks(filepath: str = common_filepath_option) -> None:
    """Import IP trunks into GSO."""
@@ -390,3 +516,40 @@ def import_partners(file_path: str = typer.Argument(..., help="Path to the CSV f
        typer.echo(f"Failed to import partners: {e}")
    finally:
        db.session.close()


@app.command()
def import_geant_ip(filepath: str = common_filepath_option) -> None:
    """Import GEANT IP into GSO."""
    successfully_imported_data = []
    geant_ip_list = _read_data(Path(filepath))

    for geant_ip in geant_ip_list:
        partner = geant_ip["partner"]
        typer.echo(f"Creating imported GEANT IP for {partner}")

        try:
            initial_data = GeantIPImportModel(**geant_ip)
            start_process("create_imported_geant_ip", [initial_data.model_dump()])
            edge_ports = [sbp["edge_port"] for sbp in geant_ip["service_binding_ports"]]
            successfully_imported_data.append(edge_ports)
            typer.echo(f"Successfully created imported GEANT IP for {partner}")
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")

    typer.echo("Waiting for the dust to settle before importing new products...")
    time.sleep(1)

    # Migrate new products from imported to "full" counterpart.
    imported_products = get_subscriptions(
        [ProductType.IMPORTED_GEANT_IP], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
    )

    for subscription_id in imported_products:
        typer.echo(f"Importing {subscription_id}")
        start_process("import_geant_ip", [subscription_id])

    if successfully_imported_data:
        typer.echo("Successfully created imported GEANT IPs:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")
Original line number Diff line number Diff line
"""Add Edge Port and GÉANT IP workflows.

Revision ID: bf05800fe9fc
Revises: a08bf228f112
Create Date: 2024-10-08 11:22:00.038925

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = 'bf05800fe9fc'
down_revision = 'a08bf228f112'
branch_labels = None
depends_on = None


from orchestrator.migrations.helpers import create_workflow, delete_workflow

new_workflows = [
    {
        "name": "create_edge_port",
        "target": "CREATE",
        "description": "Create Edge Port",
        "product_type": "EdgePort"
    },
    {
        "name": "modify_edge_port",
        "target": "MODIFY",
        "description": "Modify Edge Port",
        "product_type": "EdgePort"
    },
    {
        "name": "terminate_edge_port",
        "target": "TERMINATE",
        "description": "Terminate Edge Port",
        "product_type": "EdgePort"
    },
    {
        "name": "validate_edge_port",
        "target": "SYSTEM",
        "description": "Validate Edge Port Configuration",
        "product_type": "EdgePort"
    },
    {
        "name": "create_imported_edge_port",
        "target": "CREATE",
        "description": "Import Edge Port",
        "product_type": "ImportedEdgePort"
    },
    {
        "name": "import_edge_port",
        "target": "MODIFY",
        "description": "Import Edge Port",
        "product_type": "ImportedEdgePort"
    },
    {
        "name": "create_geant_ip",
        "target": "CREATE",
        "description": "Create G\u00c9ANT IP",
        "product_type": "GeantIP"
    },
    {
        "name": "modify_geant_ip",
        "target": "MODIFY",
        "description": "Modify G\u00c9ANT IP",
        "product_type": "GeantIP"
    },
    {
        "name": "migrate_geant_ip",
        "target": "MODIFY",
        "description": "Migrate G\u00c9ANT IP",
        "product_type": "GeantIP"
    },
    {
        "name": "create_imported_geant_ip",
        "target": "CREATE",
        "description": "Import G\u00c9ANT IP",
        "product_type": "ImportedGeantIP"
    },
    {
        "name": "import_geant_ip",
        "target": "MODIFY",
        "description": "Import G\u00c9ANT IP",
        "product_type": "ImportedGeantIP"
    }
]


def upgrade() -> None:
    conn = op.get_bind()
    for workflow in new_workflows:
        create_workflow(conn, workflow)


def downgrade() -> None:
    conn = op.get_bind()
    for workflow in new_workflows:
        delete_workflow(conn, workflow["name"])
Original line number Diff line number Diff line
"""Add IPV4/IPV6 netmask to Service Binding Port model .

Revision ID: df108295d917
Revises: bf05800fe9fc
Create Date: 2024-10-10 11:39:43.051211

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = 'df108295d917'
down_revision = 'bf05800fe9fc'
branch_labels = None
depends_on = None


def upgrade() -> None:
    conn = op.get_bind()
    conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('ipv4_mask', 'IPV4 subnet mask') RETURNING resource_types.resource_type_id
    """))
    conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('ipv6_mask', 'IPV6 subnet mask') RETURNING resource_types.resource_type_id
    """))
    conn.execute(sa.text("""
INSERT INTO product_block_resource_types (product_block_id, resource_type_id) VALUES ((SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('ServiceBindingPort')), (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('ipv4_mask')))
    """))
    conn.execute(sa.text("""
INSERT INTO product_block_resource_types (product_block_id, resource_type_id) VALUES ((SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('ServiceBindingPort')), (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('ipv6_mask')))
    """))
    conn.execute(sa.text("""

                WITH subscription_instance_ids AS (
                    SELECT subscription_instances.subscription_instance_id
                    FROM   subscription_instances
                    WHERE  subscription_instances.product_block_id IN (
                        SELECT product_blocks.product_block_id
                        FROM   product_blocks
                        WHERE  product_blocks.name = 'ServiceBindingPort'
                    )
                )

                INSERT INTO
                    subscription_instance_values (subscription_instance_id, resource_type_id, value)
                SELECT
                    subscription_instance_ids.subscription_instance_id,
                    resource_types.resource_type_id,
                    'None'
                FROM resource_types
                CROSS JOIN subscription_instance_ids
                WHERE resource_types.resource_type = 'ipv4_mask'
        
    """))
    conn.execute(sa.text("""

                WITH subscription_instance_ids AS (
                    SELECT subscription_instances.subscription_instance_id
                    FROM   subscription_instances
                    WHERE  subscription_instances.product_block_id IN (
                        SELECT product_blocks.product_block_id
                        FROM   product_blocks
                        WHERE  product_blocks.name = 'ServiceBindingPort'
                    )
                )

                INSERT INTO
                    subscription_instance_values (subscription_instance_id, resource_type_id, value)
                SELECT
                    subscription_instance_ids.subscription_instance_id,
                    resource_types.resource_type_id,
                    'None'
                FROM resource_types
                CROSS JOIN subscription_instance_ids
                WHERE resource_types.resource_type = 'ipv6_mask'
        
    """))


def downgrade() -> None:
    conn = op.get_bind()
    conn.execute(sa.text("""
DELETE FROM product_block_resource_types WHERE product_block_resource_types.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('ServiceBindingPort')) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('ipv4_mask'))
    """))
    conn.execute(sa.text("""
DELETE FROM subscription_instance_values USING product_block_resource_types WHERE subscription_instance_values.subscription_instance_id IN (SELECT subscription_instances.subscription_instance_id FROM subscription_instances WHERE subscription_instances.subscription_instance_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('ServiceBindingPort'))) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('ipv4_mask'))
    """))
    conn.execute(sa.text("""
DELETE FROM product_block_resource_types WHERE product_block_resource_types.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('ServiceBindingPort')) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('ipv6_mask'))
    """))
    conn.execute(sa.text("""
DELETE FROM subscription_instance_values USING product_block_resource_types WHERE subscription_instance_values.subscription_instance_id IN (SELECT subscription_instances.subscription_instance_id FROM subscription_instances WHERE subscription_instances.subscription_instance_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('ServiceBindingPort'))) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('ipv6_mask'))
    """))
    conn.execute(sa.text("""
DELETE FROM subscription_instance_values WHERE subscription_instance_values.resource_type_id IN (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('ipv4_mask', 'ipv6_mask'))
    """))
    conn.execute(sa.text("""
DELETE FROM resource_types WHERE resource_types.resource_type IN ('ipv4_mask', 'ipv6_mask')
    """))
Original line number Diff line number Diff line
@@ -8,8 +8,10 @@
from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY
from pydantic_forms.types import strEnum

from gso.products.product_types.edge_port import EdgePort, ImportedEdgePort
from gso.products.product_types.iptrunk import ImportedIptrunk, Iptrunk
from gso.products.product_types.lan_switch_interconnect import LanSwitchInterconnect
from gso.products.product_types.nren_l3_core_service import ImportedNRENL3CoreService, NRENL3CoreService
from gso.products.product_types.office_router import ImportedOfficeRouter, OfficeRouter
from gso.products.product_types.opengear import ImportedOpengear, Opengear
from gso.products.product_types.pop_vlan import PopVlan
@@ -37,6 +39,12 @@ class ProductName(strEnum):
    IMPORTED_OFFICE_ROUTER = "Imported office router"
    OPENGEAR = "Opengear"
    IMPORTED_OPENGEAR = "Imported Opengear"
    EDGE_PORT = "Edge Port"
    IMPORTED_EDGE_PORT = "Imported Edge Port"
    GEANT_IP = "GÉANT IP"
    IMPORTED_GEANT_IP = "Imported GÉANT IP"
    IAS = "IAS"
    IMPORTED_IAS = "Imported IAS"


class ProductType(strEnum):
@@ -57,6 +65,12 @@ class ProductType(strEnum):
    IMPORTED_OFFICE_ROUTER = ImportedOfficeRouter.__name__
    OPENGEAR = Opengear.__name__
    IMPORTED_OPENGEAR = Opengear.__name__
    EDGE_PORT = EdgePort.__name__
    IMPORTED_EDGE_PORT = ImportedEdgePort.__name__
    GEANT_IP = NRENL3CoreService.__name__
    IMPORTED_GEANT_IP = ImportedNRENL3CoreService.__name__
    IAS = NRENL3CoreService.__name__
    IMPORTED_IAS = ImportedNRENL3CoreService.__name__


SUBSCRIPTION_MODEL_REGISTRY.update(
@@ -76,5 +90,11 @@ SUBSCRIPTION_MODEL_REGISTRY.update(
        ProductName.IMPORTED_OFFICE_ROUTER.value: ImportedOfficeRouter,
        ProductName.OPENGEAR.value: Opengear,
        ProductName.IMPORTED_OPENGEAR.value: ImportedOpengear,
        ProductName.EDGE_PORT.value: EdgePort,
        ProductName.IMPORTED_EDGE_PORT.value: ImportedEdgePort,
        ProductName.GEANT_IP.value: NRENL3CoreService,
        ProductName.IMPORTED_GEANT_IP.value: ImportedNRENL3CoreService,
        ProductName.IAS.value: NRENL3CoreService,
        ProductName.IMPORTED_IAS.value: ImportedNRENL3CoreService,
    },
)
Original line number Diff line number Diff line
""":term:`BGP` session product block."""

import strawberry
from orchestrator.domain.base import ProductBlockModel
from orchestrator.types import SubscriptionLifecycle
from pydantic import Field
from pydantic_forms.types import strEnum

from gso.utils.types.ip_address import IPAddress


@strawberry.enum
class IPFamily(strEnum):
    """Possible IP families of a :term:`BGP` peering."""

    V4UNICAST = "v4unicast"
    V6UNICAST = "v6unicast"
    V4MULTICAST = "v4multicast"
    V6MULTICAST = "v6multicast"


class BGPSessionInactive(ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="BGPSession"):
    """A :term:`BGP` session that is currently inactive. See :class:`BGPSession`."""

    peer_address: IPAddress | None = None
    bfd_enabled: bool | None = None
    bfd_interval: int | None = None
    bfd_multiplier: int | None = None
    families: list[IPFamily] = Field(default_factory=list)
    has_custom_policies: bool | None = None
    authentication_key: str | None = None
    multipath_enabled: bool | None = None
    send_default_route: bool | None = None
    is_multi_hop: bool = False
    is_passive: bool = False
    rtbh_enabled: bool = False


class BGPSessionProvisioning(BGPSessionInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    """A :term:`BGP` session that is currently being provisioned. See :class:`BGPSession`."""

    peer_address: IPAddress
    bfd_enabled: bool
    bfd_interval: int | None = None
    bfd_multiplier: int | None = None
    families: list[IPFamily]
    has_custom_policies: bool
    authentication_key: str
    multipath_enabled: bool
    send_default_route: bool
    is_multi_hop: bool
    is_passive: bool
    rtbh_enabled: bool


class BGPSession(BGPSessionProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """A :term:`BGP` session that is currently deployed in the network."""

    #: The peering address of the session.
    peer_address: IPAddress
    #: Whether :term:`BFD` is enabled.
    bfd_enabled: bool
    #: The :term:`BFD` interval, if enabled.
    bfd_interval: int | None = None
    #: The :term:`BFD` multiplier, if enabled.
    bfd_multiplier: int | None = None
    #: The list of IP families enabled for this session.
    families: list[IPFamily]
    #: Whether any custom policies exist for this session.
    has_custom_policies: bool
    #: The authentication key of the :term:`BGP` session.
    authentication_key: str
    #: Whether multi-path is enabled.
    multipath_enabled: bool
    #: Whether we send a last resort route.
    send_default_route: bool
    #: Whether this session is multi-hop or not. Defaults to no.
    is_multi_hop: bool
    #: Whether this is a passive session.
    is_passive: bool
    #: Whether Remote Triggered Blackhole is enabled
    rtbh_enabled: bool
+122 −0
Original line number Diff line number Diff line
"""Edge port product block.

Edge port sets the boundary between Geant network and an external entity that could also be a different technological
domain still managed by GEANT. In other words, an Edge port determines where the network ends.
"""

from orchestrator.domain.base import ProductBlockModel
from orchestrator.types import SubscriptionLifecycle, strEnum

from gso.products.product_blocks.router import RouterBlock, RouterBlockInactive, RouterBlockProvisioning
from gso.utils.types.interfaces import LAGMemberList, PhysicalPortCapacity


class EncapsulationType(strEnum):
    """Types of encapsulation for edge ports.

    Null supports a single service on the port.
    Dot1Q supports multiple services for one customer or services for multiple customers.
    QinQ expands VLAN space by double-tagging frames.
    """

    DOT1Q = "dot1q"
    QINQ = "qinq"
    NULL = "null"


class EdgePortType(strEnum):
    """Types of edge ports."""

    CUSTOMER = "CUSTOMER"
    INFRASTRUCTURE = "INFRASTRUCTURE"
    PRIVATE = "PRIVATE"
    PUBLIC = "PUBLIC"
    RE_INTERCONNECT = "RE_INTERCONNECT"


class EdgePortAEMemberBlockInactive(
    ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="EdgePortAEMemberBlock"
):
    """An inactive Edge Port AE interface."""

    interface_name: str | None = None
    interface_description: str | None = None


class EdgePortAEMemberBlockProvisioning(EdgePortAEMemberBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    """A provisional Edge Port AE interface."""

    interface_name: str
    interface_description: str | None = None


class EdgePortAEMemberBlock(EdgePortAEMemberBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """An Edge Port AE interface."""

    interface_name: str
    interface_description: str | None = None


class EdgePortBlockInactive(
    ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="EdgePortBlock"
):
    """An edge port that's currently inactive. See :class:`EdgePortBlock`."""

    edge_port_node: RouterBlockInactive | None = None
    edge_port_name: str | None = None
    edge_port_description: str | None = None
    edge_port_enable_lacp: bool | None = None
    edge_port_encapsulation: EncapsulationType = EncapsulationType.DOT1Q
    edge_port_mac_address: str | None = None
    edge_port_member_speed: PhysicalPortCapacity | None = None
    edge_port_minimum_links: int | None = None
    edge_port_type: EdgePortType | None = None
    edge_port_ignore_if_down: bool = False
    edge_port_geant_ga_id: str | None = None
    edge_port_ae_members: LAGMemberList[EdgePortAEMemberBlockInactive]


class EdgePortBlockProvisioning(EdgePortBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    """An edge port that's being provisioned. See :class:`EdgePortBlock`."""

    edge_port_node: RouterBlockProvisioning
    edge_port_name: str
    edge_port_description: str | None = None
    edge_port_enable_lacp: bool
    edge_port_encapsulation: EncapsulationType = EncapsulationType.DOT1Q
    edge_port_mac_address: str | None = None
    edge_port_member_speed: PhysicalPortCapacity
    edge_port_minimum_links: int | None = None
    edge_port_type: EdgePortType
    edge_port_ignore_if_down: bool = False
    edge_port_geant_ga_id: str | None = None
    edge_port_ae_members: LAGMemberList[EdgePortAEMemberBlockProvisioning]  # type: ignore[assignment]


class EdgePortBlock(EdgePortBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """An edge port that's currently deployed in the network."""

    #: The router that this edge port is connected to.
    edge_port_node: RouterBlock
    #: The name of the edge port, in our case, corresponds to the name of the :term:`LAG` interface.
    edge_port_name: str
    #: A description of the edge port.
    edge_port_description: str | None = None
    #: Indicates whether :term:`LACP` is enabled for this edge port.
    edge_port_enable_lacp: bool
    #: The type of encapsulation used on this edge port, by default DOT1Q.
    edge_port_encapsulation: EncapsulationType = EncapsulationType.DOT1Q
    #: The MAC address assigned to this edge port, if applicable.
    edge_port_mac_address: str | None = None
    #: The speed capacity of each member in the physical port.
    edge_port_member_speed: PhysicalPortCapacity
    #: The minimum number of links required for this edge port.
    edge_port_minimum_links: int | None = None
    #: The type of edge port (e.g., customer, private, public).
    edge_port_type: EdgePortType
    #: If set to True, the edge port will be ignored if it is down.
    edge_port_ignore_if_down: bool = False
    #: The GEANT GA ID associated with this edge port, if any.
    edge_port_geant_ga_id: str | None = None
    #: A list of :term:`LAG` members associated with this edge port.
    edge_port_ae_members: LAGMemberList[EdgePortAEMemberBlock]  # type: ignore[assignment]
Original line number Diff line number Diff line
"""Product blocks for :class:`NREN` Layer 3 Core Service products."""

from orchestrator.domain.base import ProductBlockModel
from orchestrator.types import SubscriptionLifecycle
from pydantic import Field

from gso.products.product_blocks.service_binding_port import (
    ServiceBindingPort,
    ServiceBindingPortInactive,
    ServiceBindingPortProvisioning,
)
from gso.utils.shared_enums import APType


class NRENAccessPortInactive(
    ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="NRENAccessPort"
):
    """An access port for an R&E :term:`NREN` service that is inactive."""

    ap_type: APType | None = None
    sbp: ServiceBindingPortInactive


class NRENAccessPortProvisioning(NRENAccessPortInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    """An access port for an R&E :term:`NREN` service that is being provisioned."""

    ap_type: APType
    sbp: ServiceBindingPortProvisioning


class NRENAccessPort(NRENAccessPortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """An access port for an R&E :term:`NREN` service."""

    #: The type of Access Port
    ap_type: APType
    #: The corresponding :term:`SBP` of this Access Port.
    sbp: ServiceBindingPort


class NRENL3CoreServiceBlockInactive(
    ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="NRENL3CoreServiceBlock"
):
    """An inactive :term:`NREN` L3 Core service subscription. See :class:`NRENL3CoreServiceBlock`."""

    nren_ap_list: list[NRENAccessPortInactive] = Field(default_factory=list)


class NRENL3CoreServiceBlockProvisioning(
    NRENL3CoreServiceBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]
):
    """A provisioning :term:`NREN` L3 Core Service subscription. See :class:`NRENL3CoreServiceBlock`."""

    nren_ap_list: list[NRENAccessPortProvisioning]  # type: ignore[assignment]


class NRENL3CoreServiceBlock(NRENL3CoreServiceBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """An active :term:`NREN` L3 Core Service subscription block."""

    #: The list of Access Points where this service is present.
    nren_ap_list: list[NRENAccessPort]  # type: ignore[assignment]
Original line number Diff line number Diff line
@@ -43,9 +43,9 @@ class OpengearBlock(OpengearBlockProvisioning, lifecycle=[SubscriptionLifecycle.
    opengear_hostname: str
    #: The site where the Opengear device is located.
    opengear_site: SiteBlock
    #: The WAN address of the Opengear device.
    #: The :term:`WAN` address of the Opengear device.
    opengear_wan_address: ipaddress.IPv4Address
    #: The WAN netmask of the Opengear device.
    #: The :term:`WAN` netmask of the Opengear device.
    opengear_wan_netmask: ipaddress.IPv4Address
    #: The WAN gateway of the Opengear device.
    #: The :term:`WAN` gateway of the Opengear device.
    opengear_wan_gateway: ipaddress.IPv4Address
Original line number Diff line number Diff line
"""Service Binding Port.

A service binding port is used to logically attach an edge port to a customer service using a :term:`VLAN`.
"""

from typing import Annotated

from orchestrator.domain.base import ProductBlockModel
from orchestrator.types import SubscriptionLifecycle
from pydantic import Field

from gso.products.product_blocks.bgp_session import BGPSession, BGPSessionInactive, BGPSessionProvisioning
from gso.products.product_blocks.edge_port import EdgePortBlock, EdgePortBlockInactive, EdgePortBlockProvisioning
from gso.utils.shared_enums import SBPType
from gso.utils.types.ip_address import IPv4AddressType, IPV4Netmask, IPv6AddressType, IPV6Netmask

VLAN_ID = Annotated[int, Field(gt=0, lt=4096)]


class ServiceBindingPortInactive(
    ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="ServiceBindingPort"
):
    """A Service Binding Port that's currently inactive. See :class:`ServiceBindingPort`."""

    is_tagged: bool | None = None
    vlan_id: VLAN_ID | None = None
    sbp_type: SBPType | None = None
    ipv4_address: IPv4AddressType | None = None
    ipv4_mask: IPV4Netmask | None = None
    ipv6_address: IPv6AddressType | None = None
    ipv6_mask: IPV6Netmask | None = None
    custom_firewall_filters: bool | None = None
    geant_sid: str | None = None
    sbp_bgp_session_list: list[BGPSessionInactive] = Field(default_factory=list)
    edge_port: EdgePortBlockInactive | None = None


class ServiceBindingPortProvisioning(ServiceBindingPortInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    """A Service Binding Port that's currently being provisioned. See :class:`ServiceBindingPort`."""

    is_tagged: bool
    vlan_id: VLAN_ID | None = None
    sbp_type: SBPType
    ipv4_address: IPv4AddressType | None = None
    ipv4_mask: IPV4Netmask | None = None
    ipv6_address: IPv6AddressType | None = None
    ipv6_mask: IPV6Netmask | None = None
    custom_firewall_filters: bool
    geant_sid: str
    sbp_bgp_session_list: list[BGPSessionProvisioning]  # type: ignore[assignment]
    edge_port: EdgePortBlockProvisioning


class ServiceBindingPort(ServiceBindingPortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """A service binding port that is actively used in the network."""

    #: Whether this :term:`VLAN` is tagged or not.
    is_tagged: bool
    #: The :term:`VLAN` ID.
    vlan_id: VLAN_ID | None = None
    #: Is this service binding port layer 2 or 3?
    sbp_type: SBPType
    #: If layer 3, IPv4 resources.
    ipv4_address: IPv4AddressType | None = None
    #: IPV4 subnet mask.
    ipv4_mask: IPV4Netmask | None = None
    #: If layer 3, IPv6 resources.
    ipv6_address: IPv6AddressType | None = None
    #: IPV6 subnet mask.
    ipv6_mask: IPV6Netmask | None = None
    #: Any custom firewall filters that the partner may require.
    custom_firewall_filters: bool
    #: The GÉANT service ID of this binding port.
    geant_sid: str
    #: The :term:`BGP` sessions associated with this service binding port.
    sbp_bgp_session_list: list[BGPSession]  # type: ignore[assignment]
    #: The Edge Port on which this :term:`SBP` resides.
    edge_port: EdgePortBlock
+42 −0
Original line number Diff line number Diff line
"""Product types for Edge Port."""

from orchestrator.domain.base import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle

from gso.products.product_blocks.edge_port import (
    EdgePortBlock,
    EdgePortBlockInactive,
    EdgePortBlockProvisioning,
)


class EdgePortInactive(SubscriptionModel, is_base=True):
    """An Edge Port that is inactive."""

    edge_port: EdgePortBlockInactive


class EdgePortProvisioning(EdgePortInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    """An Edge Port that is being provisioned."""

    edge_port: EdgePortBlockProvisioning


class EdgePort(EdgePortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """An Edge Port that is active."""

    edge_port: EdgePortBlock


class ImportedEdgePortInactive(SubscriptionModel, is_base=True):
    """An imported, inactive Edge Port."""

    edge_port: EdgePortBlockInactive


class ImportedEdgePort(
    ImportedEdgePortInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE]
):
    """An imported Edge Port that is currently active."""

    edge_port: EdgePortBlock
Original line number Diff line number Diff line
""":term:`NREN` L3 Core Service product type."""

from orchestrator.domain import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle
from pydantic_forms.types import strEnum

from gso.products.product_blocks.nren_l3_core_service import (
    NRENL3CoreServiceBlock,
    NRENL3CoreServiceBlockInactive,
    NRENL3CoreServiceBlockProvisioning,
)


class NRENL3CoreServiceType(strEnum):
    """Available types of :term:`NREN` Layer 3 Core Services.

    The core services offered include GÉANT IP for R&E access, and the Internet Access Service.
    """

    GEANT_IP = "GÉANT IP"
    IAS = "IAS"


class NRENL3CoreServiceInactive(SubscriptionModel, is_base=True):
    """An inactive :term:`NREN` L3 Core Service subscription."""

    nren_l3_core_service_type: NRENL3CoreServiceType
    nren_l3_core_service: NRENL3CoreServiceBlockInactive


class NRENL3CoreServiceProvisioning(NRENL3CoreServiceInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
    """A :term:`NREN` L3 Core Service subscription that's being provisioned."""

    nren_l3_core_service_type: NRENL3CoreServiceType
    nren_l3_core_service: NRENL3CoreServiceBlockProvisioning


class NRENL3CoreService(NRENL3CoreServiceProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
    """An active :term:`NREN` L3 Core Service subscription."""

    nren_l3_core_service_type: NRENL3CoreServiceType
    nren_l3_core_service: NRENL3CoreServiceBlock


class ImportedNRENL3CoreServiceInactive(SubscriptionModel, is_base=True):
    """An imported, inactive :term:`NREN` L3 Core Service subscription."""

    nren_l3_core_service_type: NRENL3CoreServiceType
    nren_l3_core_service: NRENL3CoreServiceBlockInactive


class ImportedNRENL3CoreService(
    ImportedNRENL3CoreServiceInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE]
):
    """An imported :term:`NREN` L3 Core Service subscription."""

    nren_l3_core_service_type: NRENL3CoreServiceType
    nren_l3_core_service: NRENL3CoreServiceBlock
Original line number Diff line number Diff line
@@ -99,6 +99,8 @@ class KentikClient:

        If the site is not found, return an empty dict.

        .. vale off

        :param str site_slug: The name of the site, should be a three-letter slug like COR or POZ.
        """
        sites = self.get_sites()
Original line number Diff line number Diff line
@@ -10,7 +10,7 @@ from requests import HTTPError, Response
from requests.adapters import HTTPAdapter

from gso.settings import load_oss_params
from gso.utils.types.snmp import SNMPVersion
from gso.utils.shared_enums import SNMPVersion

logger = logging.getLogger(__name__)

Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ from gso.settings import load_oss_params
from gso.utils.device_info import (
    DEFAULT_SITE,
    FEASIBLE_IP_TRUNK_LAG_RANGE,
    FEASIBLE_SERVICES_LAG_RANGE,
    ROUTER_ROLE,
    TierInfo,
)
@@ -289,8 +290,8 @@ class NetboxClient:
            interface.lag = None
            interface.save()

    def get_available_lags(self, router_id: UUID) -> list[str]:
        """Return all available :term:`LAG` not assigned to a device."""
    def get_available_lags_in_range(self, router_id: UUID, lag_range: range) -> list[str]:
        """Return all available LAGs within a given range not assigned to a device."""
        router_name = Router.from_subscription(router_id).router.router_fqdn
        device = self.get_device_by_name(router_name)

@@ -299,12 +300,20 @@ class NetboxClient:
            interface["name"] for interface in self.netbox.dcim.interfaces.filter(device=device.name, type="lag")
        ]

        # Generate all feasible LAGs
        all_feasible_lags = [f"lag-{i}" for i in FEASIBLE_IP_TRUNK_LAG_RANGE]
        # Generate all feasible LAGs in the specified range
        all_feasible_lags = [f"lag-{i}" for i in lag_range]

        # Return available LAGs not assigned to the device
        return [lag for lag in all_feasible_lags if lag not in lag_interface_names]

    def get_available_lags(self, router_id: UUID) -> list[str]:
        """Return all available :term:`LAG` not assigned to a device."""
        return self.get_available_lags_in_range(router_id, FEASIBLE_IP_TRUNK_LAG_RANGE)

    def get_available_services_lags(self, router_id: UUID) -> list[str]:
        """Return all available Edge port LAGs not assigned to a device."""
        return self.get_available_lags_in_range(router_id, FEASIBLE_SERVICES_LAG_RANGE)

    @staticmethod
    def calculate_speed_bits_per_sec(speed: str) -> int:
        """Extract the numeric part from the speed."""
Original line number Diff line number Diff line
@@ -246,6 +246,20 @@ def get_active_site_subscriptions(includes: list[str] | None = None) -> list[Sub
    )


def get_active_edge_port_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
    """Retrieve active Edge Port subscriptions.

    :param includes: The fields to be included in the returned Subscription objects.
    :type includes: list[str]

    :return: A list of Subscription objects for Edge Ports.
    :rtype: list[Subscription]
    """
    return get_subscriptions(
        product_types=[ProductType.EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
    )


def get_site_by_name(site_name: str) -> Site:
    """Get a site by its name.

+4 −10
Original line number Diff line number Diff line
@@ -9,15 +9,13 @@ import json
import logging
import os
from pathlib import Path
from typing import Annotated

from orchestrator.types import UUIDstr
from pydantic import EmailStr, Field
from pydantic import EmailStr
from pydantic_forms.types import strEnum
from pydantic_settings import BaseSettings
from typing_extensions import Doc

from gso.utils.types.ip_address import PortNumber
from gso.utils.types.ip_address import IPV4Netmask, IPV6Netmask, PortNumber

logger = logging.getLogger(__name__)

@@ -62,16 +60,12 @@ class InfoBloxParams(BaseSettings):
    password: str


V4Netmask = Annotated[int, Field(ge=0, le=32), Doc("A valid netmask for an IPv4 network or address.")]
V6Netmask = Annotated[int, Field(ge=0, le=128), Doc("A valid netmask for an IPv6 network or address.")]


class V4NetworkParams(BaseSettings):
    """A set of parameters that describe an IPv4 network in InfoBlox."""

    containers: list[ipaddress.IPv4Network]
    networks: list[ipaddress.IPv4Network]
    mask: V4Netmask
    mask: IPV4Netmask


class V6NetworkParams(BaseSettings):
@@ -79,7 +73,7 @@ class V6NetworkParams(BaseSettings):

    containers: list[ipaddress.IPv6Network]
    networks: list[ipaddress.IPv6Network]
    mask: V6Netmask
    mask: IPV6Netmask


class ServiceNetworkParams(BaseSettings):
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
            "router_role": "Router role",

            "geant_s_sid": "GÉANT S-SID",
            "geant_sid": "GÉANT S-SID",
            "iptrunk_description": "IPtrunk description",
            "iptrunk_type": "IPtrunk type",
            "iptrunk_speed": "Capacity per port (in Gbits/s)",
@@ -32,8 +33,7 @@
            "migrate_to_different_site": "Migrating to a different Site",
            "remove_configuration": "Remove configuration from the router",
            "clean_up_ipam": "Clean up related entries in IPAM",
            "restore_isis_metric": "Restore ISIS metric to original value",
            "confirm_info": "Please verify this form looks correct."
            "restore_isis_metric": "Restore ISIS metric to original value"
        }
    },
    "workflow": {
@@ -44,17 +44,23 @@
        "create_router": "Create Router",
        "create_site": "Create Site",
        "create_switch": "Create Switch",
        "create_edge_port": "Create Edge Port",
        "create_nren_l3_core_service": "Create NREN L3 Core Service",
        "deploy_twamp": "Deploy TWAMP",
        "migrate_iptrunk": "Migrate IP Trunk",
        "migrate_nren_l3_core_service": "Migrate NREN L3 Core Service",
        "modify_isis_metric": "Modify the ISIS metric",
        "modify_site": "Modify Site",
        "modify_trunk_interface": "Modify IP Trunk interface",
        "modify_connection_strategy": "Modify connection strategy",
        "modify_router_kentik_license": "Modify device license in Kentik",
        "modify_edge_port": "Modify Edge Port",
        "modify_nren_l3_core_service": "Modify NREN L3 Core Service",
        "terminate_iptrunk": "Terminate IP Trunk",
        "terminate_router": "Terminate Router",
        "terminate_site": "Terminate Site",
        "terminate_switch": "Terminate Switch",
        "terminate_edge_port": "Terminate Edge Port",
        "redeploy_base_config": "Redeploy base config",
        "update_ibgp_mesh": "Update iBGP mesh",
        "create_imported_site": "NOT FOR HUMANS -- Import existing site",
@@ -63,15 +69,20 @@
        "create_imported_super_pop_switch": "NOT FOR HUMANS -- Import existing super PoP switch",
        "create_imported_office_router": "NOT FOR HUMANS -- Import existing office router",
        "create_imported_opengear": "NOT FOR HUMANS -- Import existing OpenGear",
        "create_imported_edge_port": "NOT FOR HUMANS -- Import existing Edge Port",
        "create_imported_nren_l3_core_service": "NOT FOR HUMANS -- Import existing NREN L3 Core Service",
        "import_site": "NOT FOR HUMANS -- Finalize import into a Site product",
        "import_router": "NOT FOR HUMANS -- Finalize import into a Router product",
        "import_iptrunk": "NOT FOR HUMANS -- Finalize import into an IP trunk product",
        "import_office_router": "NOT FOR HUMANS -- Finalize import into an Office router product",
        "import_super_pop_switch": "NOT FOR HUMANS -- Finalize import into a Super PoP switch",
        "import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear",
        "import_edge_port": "NOT FOR HUMANS -- Finalize import into an Edge Port",
        "import_nren_l3_core_service": "NOT FOR HUMANS -- Finalize import into a NREN L3 Core Service",
        "validate_iptrunk": "Validate IP Trunk configuration",
        "validate_router": "Validate Router configuration",
        "validate_switch": "Validate Switch configuration",
        "validate_edge_port": "Validate Edge Port",
        "task_validate_geant_products": "Validation task for GEANT products",
        "task_send_email_notifications": "Send email notifications for failed tasks",
        "task_create_partners": "Create partner task",
Original line number Diff line number Diff line
@@ -45,8 +45,8 @@ class TierInfo:
        return getattr(self, name)


# The range includes values from 1 to 10 (11 is not included)
FEASIBLE_IP_TRUNK_LAG_RANGE = range(1, 11)
FEASIBLE_SERVICES_LAG_RANGE = range(20, 51)

# Define default values
ROUTER_ROLE = {"name": "router", "slug": "router"}
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import re
from typing import TYPE_CHECKING
from uuid import UUID

from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice

from gso import settings
@@ -11,6 +12,7 @@ from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import subscriptions
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_all_partners
from gso.utils.shared_enums import Vendor
from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType
@@ -75,6 +77,18 @@ def available_lags_choices(router_id: UUID) -> Choice | None:
    return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True))  # type: ignore[arg-type]


def available_service_lags_choices(router_id: UUID) -> Choice | None:
    """Return a list of available lags for a given router for services.

    For Nokia routers, return a list of available lags.
    For Juniper routers, return ``None``.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    side_a_ae_iface_list = NetboxClient().get_available_services_lags(router_id)
    return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True))  # type: ignore[arg-type]


def get_router_vendor(router_id: UUID) -> Vendor:
    """Retrieve the vendor of a router.

@@ -177,6 +191,16 @@ def active_router_selector() -> Choice:
    return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True))  # type: ignore[arg-type]


def active_pe_router_selector() -> Choice:
    """Generate a dropdown selector for choosing an active PE Router in an input form."""
    routers = {
        str(router.subscription_id): router.description
        for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
    }

    return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]


def active_switch_selector() -> Choice:
    """Generate a dropdown selector for choosing an active Switch in an input form."""
    switch_subscriptions = {
@@ -185,3 +209,42 @@ def active_switch_selector() -> Choice:
    }

    return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True))  # type: ignore[arg-type]


def active_edge_port_selector(*, partner_id: UUIDstr | None = None) -> Choice:
    """Generate a dropdown selector for choosing an active Edge Port in an input form."""
    edge_port_subscriptions = subscriptions.get_active_edge_port_subscriptions(
        includes=["subscription_id", "description", "customer_id"]
    )

    if partner_id:
        # ``partner_id`` is set, so we will filter accordingly.
        edge_port_subscriptions = list(
            filter(lambda subscription: bool(subscription["customer_id"] == partner_id), edge_port_subscriptions)
        )

    edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions}

    return Choice(
        "Select an Edge Port",
        zip(edge_ports.keys(), edge_ports.items(), strict=True),  # type: ignore[arg-type]
    )


def partner_choice() -> Choice:
    """Return a Choice object containing a list of available partners."""
    partners = {partner["partner_id"]: partner["name"] for partner in get_all_partners()}

    return Choice("Select a partner", zip(partners.values(), partners.items(), strict=True))  # type: ignore[arg-type]


def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int, enable_lacp: bool) -> None:
    """Validate the number of edge port members based on the :term:`LACP` setting.

    :param number_of_members: The number of members to validate.
    :param enable_lacp: Whether :term:`LACP` is enabled or not.
    :raises ValueError: If the number of members is greater than 1 and :term:`LACP` is disabled.
    """
    if number_of_members > 1 and not enable_lacp:
        err_msg = "Number of members must be 1 if LACP is disabled."
        raise ValueError(err_msg)
Original line number Diff line number Diff line
"""Shared choices for the different models."""

from enum import StrEnum

from pydantic_forms.types import strEnum


@@ -15,3 +17,25 @@ class ConnectionStrategy(strEnum):

    IN_BAND = "IN BAND"
    OUT_OF_BAND = "OUT OF BAND"


class SNMPVersion(StrEnum):
    """An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""

    V2C = "v2c"
    V3 = "v3"


class APType(strEnum):
    """Enumerator of the types of Access Port."""

    PRIMARY = "PRIMARY"
    BACKUP = "BACKUP"
    LOAD_BALANCED = "LOAD_BALANCED"


class SBPType(strEnum):
    """Enumerator for the two allowed types of service binding port: layer 2 or layer 3."""

    L2 = "l2"
    L3 = "l3"
Original line number Diff line number Diff line
@@ -18,13 +18,29 @@ def validate_ipv4_or_ipv6(value: str) -> str:
        return value


def validate_ipv4_or_ipv6_network(value: str) -> str:
    """Validate that a value is a valid IPv4 or IPv6 network."""
    try:
        ipaddress.ip_network(value)
    except ValueError as e:
        msg = "Enter a valid IPv4 or IPv6 network."
        raise ValueError(msg) from e
    else:
        return value


def _str(value: Any) -> str:
    return str(value)


IPv4AddressType = Annotated[ipaddress.IPv4Address, PlainSerializer(_str, return_type=str, when_used="always")]
IPv4NetworkType = Annotated[ipaddress.IPv4Network, PlainSerializer(_str, return_type=str, when_used="always")]
IPv6AddressType = Annotated[ipaddress.IPv6Address, PlainSerializer(_str, return_type=str, when_used="always")]
IPv6NetworkType = Annotated[ipaddress.IPv6Network, PlainSerializer(_str, return_type=str, when_used="always")]
IPAddress = Annotated[str, AfterValidator(validate_ipv4_or_ipv6)]
IPNetwork = Annotated[str, AfterValidator(validate_ipv4_or_ipv6_network)]
IPV4Netmask = Annotated[int, Field(ge=0, le=32), Doc("A valid netmask for an IPv4 network or address.")]
IPV6Netmask = Annotated[int, Field(ge=0, le=128), Doc("A valid netmask for an IPv6 network or address.")]
PortNumber = Annotated[
    int,
    Field(

gso/utils/types/snmp.py

deleted100644 → 0
+0 −10
Original line number Diff line number Diff line
"""An enumerator of SNMP version numbers."""

from enum import StrEnum


class SNMPVersion(StrEnum):
    """An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""

    V2C = "v2c"
    V3 = "v3"
Original line number Diff line number Diff line
@@ -76,3 +76,20 @@ LazyWorkflowInstance("gso.workflows.tasks.create_partners", "task_create_partner
LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partners")
LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners")
LazyWorkflowInstance("gso.workflows.tasks.clean_old_tasks", "task_clean_old_tasks")

#  Edge port workflows
LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.modify_edge_port", "modify_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.create_imported_edge_port", "create_imported_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_port")

#  NREN L3 Core Service workflows
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.create_nren_l3_core_service", "create_nren_l3_core_service")
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.modify_nren_l3_core_service", "modify_nren_l3_core_service")
LazyWorkflowInstance(
    "gso.workflows.nren_l3_core_service.create_imported_nren_l3_core_service", "create_imported_nren_l3_core_service"
)
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.import_nren_l3_core_service", "import_nren_l3_core_service")
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.migrate_nren_l3_core_service", "migrate_nren_l3_core_service")
Original line number Diff line number Diff line
"""A creation workflow for adding a new edge port to the network."""

from typing import Annotated, Any, Self
from uuid import uuid4

from annotated_types import Len
from orchestrator import step, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import AfterValidator, ConfigDict, model_validator
from pydantic_forms.validators import validate_unique_list
from pynetbox.models.dcim import Interfaces

from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType
from gso.products.product_types.edge_port import EdgePortInactive, EdgePortProvisioning
from gso.products.product_types.router import Router
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.utils.helpers import (
    active_pe_router_selector,
    available_interfaces_choices,
    available_service_lags_choices,
    partner_choice,
    validate_edge_port_number_of_members_based_on_lacp,
)
from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity
from gso.utils.types.tt_number import TTNumber


def initial_input_form_generator(product_name: str) -> FormGenerator:
    """Gather information to create a new Edge Port."""

    class CreateEdgePortForm(FormPage):
        model_config = ConfigDict(title=product_name)

        tt_number: TTNumber
        node: active_pe_router_selector()  # type: ignore[valid-type]
        partner: partner_choice()  # type: ignore[valid-type]
        service_type: EdgePortType
        enable_lacp: bool = False
        speed: PhysicalPortCapacity
        encapsulation: EncapsulationType = EncapsulationType.DOT1Q
        number_of_members: int
        minimum_links: int
        mac_address: str | None = None
        ignore_if_down: bool = False
        geant_ga_id: str | None = None

        @model_validator(mode="after")
        def validate_number_of_members(self) -> Self:
            validate_edge_port_number_of_members_based_on_lacp(
                enable_lacp=self.enable_lacp, number_of_members=self.number_of_members
            )
            return self

    initial_user_input = yield CreateEdgePortForm

    class EdgePortLAGMember(LAGMember):
        interface_name: available_interfaces_choices(  # type: ignore[valid-type]
            initial_user_input.node, initial_user_input.speed
        )

    lag_ae_members = Annotated[
        list[EdgePortLAGMember],
        AfterValidator(validate_unique_list),
        Len(
            min_length=initial_user_input.number_of_members,
            max_length=initial_user_input.number_of_members,
        ),
    ]

    class SelectInterfaceForm(FormPage):
        model_config = ConfigDict(title="Select Interfaces")

        name: available_service_lags_choices(initial_user_input.node)  # type: ignore[valid-type]
        description: str | None = None
        ae_members: lag_ae_members

    interface_form_input_data = yield SelectInterfaceForm
    return initial_user_input.model_dump() | interface_form_input_data.model_dump()


@step("Create subscription")
def create_subscription(product: UUIDstr, partner: UUIDstr) -> State:
    """Create a new subscription object."""
    subscription = EdgePortInactive.from_product_id(product, partner)

    return {
        "subscription": subscription,
        "subscription_id": subscription.subscription_id,
    }


@step("Initialize subscription")
def initialize_subscription(
    subscription: EdgePortInactive,
    node: UUIDstr,
    service_type: EdgePortType,
    speed: PhysicalPortCapacity,
    encapsulation: EncapsulationType,
    name: str,
    minimum_links: int,
    geant_ga_id: str | None,
    mac_address: str | None,
    partner: str,
    enable_lacp: bool,  # noqa: FBT001
    ignore_if_down: bool,  # noqa: FBT001
    ae_members: list[dict[str, Any]],
    description: str | None = None,
) -> State:
    """Initialise the subscription object in the service database."""
    router = Router.from_subscription(node).router
    subscription.edge_port.edge_port_node = router
    subscription.edge_port.edge_port_type = service_type
    subscription.edge_port.edge_port_enable_lacp = enable_lacp
    subscription.edge_port.edge_port_member_speed = speed
    subscription.edge_port.edge_port_encapsulation = encapsulation
    subscription.edge_port.edge_port_name = name
    subscription.edge_port.edge_port_minimum_links = minimum_links
    subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
    subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
    subscription.edge_port.edge_port_mac_address = mac_address
    partner_name = get_partner_by_id(partner).name
    subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner_name}, {geant_ga_id or ""}"
    subscription.edge_port.edge_port_description = description
    for member in ae_members:
        subscription.edge_port.edge_port_ae_members.append(
            EdgePortAEMemberBlockInactive.new(subscription_id=uuid4(), **member)
        )
    subscription = EdgePortProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)

    return {"subscription": subscription}


@step("Reserve interfaces in NetBox")
def reserve_interfaces_in_netbox(subscription: EdgePortProvisioning) -> State:
    """Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
    nbclient = NetboxClient()
    edge_port = subscription.edge_port
    # Create :term:`LAG` interfaces
    lag_interface: Interfaces = nbclient.create_interface(
        iface_name=edge_port.edge_port_name,
        interface_type="lag",
        device_name=edge_port.edge_port_node.router_fqdn,
        description=str(subscription.subscription_id),
        enabled=True,
    )
    # Attach physical interfaces to :term:`LAG`
    # Update interface description to subscription ID
    # Reserve interfaces
    for interface in edge_port.edge_port_ae_members:
        nbclient.attach_interface_to_lag(
            device_name=edge_port.edge_port_node.router_fqdn,
            lag_name=lag_interface.name,
            iface_name=interface.interface_name,
            description=str(subscription.subscription_id),
        )
        nbclient.reserve_interface(
            device_name=edge_port.edge_port_node.router_fqdn,
            iface_name=interface.interface_name,
        )
    return {
        "subscription": subscription,
    }


@step("Allocate interfaces in NetBox")
def allocate_interfaces_in_netbox(subscription: EdgePortProvisioning) -> None:
    """Allocate the interfaces in NetBox."""
    for interface in subscription.edge_port.edge_port_ae_members:
        fqdn = subscription.edge_port.edge_port_node.router_fqdn
        iface_name = interface.interface_name
        if not fqdn or not iface_name:
            msg = "FQDN and/or interface name missing in subscription"
            raise ProcessFailureError(msg, details=subscription.subscription_id)

        NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)


@step("[DRY RUN] Create edge port")
def create_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
    """Create a new edge port in the network as a dry run."""
    extra_vars = {
        "dry_run": True,
        "subscription": subscription,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
        "verb": "create",
    }

    return {
        "playbook_name": "edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }


@step("[FOR REAL] Create edge port")
def create_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
    """Create a new edge port in the network for real."""
    extra_vars = {
        "dry_run": False,
        "subscription": subscription,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
        "verb": "create",
    }

    return {
        "playbook_name": "edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }


@workflow(
    "Create Edge Port",
    initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
    target=Target.CREATE,
)
def create_edge_port() -> StepList:
    """Create a new edge port in the network.

    * Create and initialise the subscription object in the service database
    * Deploy configuration on the new edge port, first as a dry run
    * allocate :term:`LAG` and :term:`LAG` members in the Netbox.
    """
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> reserve_interfaces_in_netbox
        >> lso_interaction(create_edge_port_dry)
        >> lso_interaction(create_edge_port_real)
        >> allocate_interfaces_in_netbox
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
"""A creation workflow that adds an existing Edge Port to the DB."""

from typing import Annotated, Any
from uuid import uuid4

from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from pydantic import AfterValidator, ConfigDict
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import validate_unique_list

from gso.products import ProductName
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType
from gso.products.product_types.edge_port import EdgePortInactive, ImportedEdgePortInactive
from gso.products.product_types.router import Router
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.helpers import active_pe_router_selector
from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity


@step("Create subscription")
def create_subscription(partner: str) -> State:
    """Create a new subscription object."""
    partner_id = get_partner_by_name(partner)["partner_id"]
    product_id = get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT)
    subscription = ImportedEdgePortInactive.from_product_id(product_id, partner_id)

    return {
        "subscription": subscription,
        "subscription_id": subscription.subscription_id,
    }


def initial_input_form_generator() -> FormGenerator:
    """Generate a form that is filled in using information passed through the :term:`API` endpoint."""

    class ImportEdgePort(FormPage):
        model_config = ConfigDict(title="Import Router")

        node: active_pe_router_selector()  # type: ignore[valid-type]
        partner: str
        service_type: EdgePortType
        enable_lacp: bool
        speed: PhysicalPortCapacity
        encapsulation: EncapsulationType = EncapsulationType.DOT1Q
        minimum_links: int
        mac_address: str | None = None
        ignore_if_down: bool = False
        geant_ga_id: str | None = None
        description: str | None = None
        name: str
        ae_members: Annotated[list[LAGMember], AfterValidator(validate_unique_list)]

    user_input = yield ImportEdgePort

    return user_input.model_dump()


@step("Initialize subscription")
def initialize_subscription(
    subscription: EdgePortInactive,
    node: UUIDstr,
    service_type: EdgePortType,
    speed: PhysicalPortCapacity,
    encapsulation: EncapsulationType,
    name: str,
    minimum_links: int,
    geant_ga_id: str | None,
    mac_address: str | None,
    partner: str,
    enable_lacp: bool,  # noqa: FBT001
    ignore_if_down: bool,  # noqa: FBT001
    ae_members: list[dict[str, Any]],
    description: str | None = None,
) -> State:
    """Initialise the subscription object in the service database."""
    router = Router.from_subscription(node).router
    subscription.edge_port.edge_port_node = router
    subscription.edge_port.edge_port_type = service_type
    subscription.edge_port.edge_port_enable_lacp = enable_lacp
    subscription.edge_port.edge_port_member_speed = speed
    subscription.edge_port.edge_port_encapsulation = encapsulation
    subscription.edge_port.edge_port_name = name
    subscription.edge_port.edge_port_minimum_links = minimum_links
    subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
    subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
    subscription.edge_port.edge_port_mac_address = mac_address
    subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner}, {geant_ga_id or ""}"
    subscription.edge_port.edge_port_description = description
    for member in ae_members:
        subscription.edge_port.edge_port_ae_members.append(
            EdgePortAEMemberBlockInactive.new(subscription_id=uuid4(), **member)
        )

    return {"subscription": subscription}


@workflow(
    "Import Edge Port",
    initial_input_form=initial_input_form_generator,
    target=Target.CREATE,
)
def create_imported_edge_port() -> StepList:
    """Import a Edge Port without provisioning it."""
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
"""A modification workflow for migrating an ImportedEdgePort to an EdgePort subscription."""

from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form

from gso.products import ProductName
from gso.products.product_types.edge_port import EdgePort, ImportedEdgePort
from gso.services.subscriptions import get_product_id_by_name


@step("Create new Edge Port subscription")
def import_edge_port_subscription(subscription_id: UUIDstr) -> State:
    """Take an ImportedEdgePort subscription, and turn it into an EdgePort subscription."""
    old_edge_port = ImportedEdgePort.from_subscription(subscription_id)
    new_subscription_id = get_product_id_by_name(ProductName.EDGE_PORT)
    new_subscription = EdgePort.from_other_product(old_edge_port, new_subscription_id)  # type: ignore[arg-type]

    return {"subscription": new_subscription}


@workflow("Import Edge Port", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_edge_port() -> StepList:
    """Modify an ImportedEdgePort subscription into an EdgePort subscription to complete the import."""
    return (
        init >> store_process_subscription(Target.MODIFY) >> unsync >> import_edge_port_subscription >> resync >> done
    )
Original line number Diff line number Diff line
"""Modify an existing edge port subscription."""

from typing import Annotated, Any, Self
from uuid import uuid4

from annotated_types import Len
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.workflow import StepList, begin, conditional, done, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, ConfigDict, model_validator
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import ReadOnlyField, validate_unique_list

from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock, EncapsulationType
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.utils.helpers import (
    available_interfaces_choices,
    available_interfaces_choices_including_current_members,
    validate_edge_port_number_of_members_based_on_lacp,
)
from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity
from gso.utils.types.tt_number import TTNumber


def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Gather input from the operator on what to change about the selected edge port subscription."""
    subscription = EdgePort.from_subscription(subscription_id)

    class ModifyEdgePortForm(FormPage):
        model_config = ConfigDict(title="Modify Edge Port")

        tt_number: TTNumber
        enable_lacp: bool = subscription.edge_port.edge_port_enable_lacp
        member_speed: PhysicalPortCapacity = subscription.edge_port.edge_port_member_speed
        encapsulation: EncapsulationType = subscription.edge_port.edge_port_encapsulation
        number_of_members: int = len(subscription.edge_port.edge_port_ae_members)
        minimum_links: int | None = subscription.edge_port.edge_port_minimum_links or None
        mac_address: str | None = subscription.edge_port.edge_port_mac_address or None
        ignore_if_down: bool = subscription.edge_port.edge_port_ignore_if_down
        geant_ga_id: str | None = subscription.edge_port.edge_port_geant_ga_id or None

        @model_validator(mode="after")
        def validate_number_of_members(self) -> Self:
            validate_edge_port_number_of_members_based_on_lacp(
                enable_lacp=self.enable_lacp, number_of_members=self.number_of_members
            )
            return self

    user_input = yield ModifyEdgePortForm

    class EdgePortLAGMember(LAGMember):
        interface_name: (  # type: ignore[valid-type]
            available_interfaces_choices_including_current_members(
                subscription.edge_port.edge_port_node.owner_subscription_id,
                user_input.member_speed,
                subscription.edge_port.edge_port_ae_members,
            )
            if user_input.member_speed == subscription.edge_port.edge_port_member_speed
            else (
                available_interfaces_choices(
                    subscription.edge_port.edge_port_node.owner_subscription_id, user_input.member_speed
                )
            )
        )

    lag_ae_members = Annotated[
        list[EdgePortLAGMember],
        AfterValidator(validate_unique_list),
        Len(
            min_length=user_input.number_of_members,
            max_length=user_input.number_of_members,
        ),
    ]

    current_lag_ae_members = (
        [
            EdgePortLAGMember(
                interface_name=iface.interface_name,
                interface_description=iface.interface_description,
            )
            for iface in subscription.edge_port.edge_port_ae_members
        ]
        if user_input.member_speed == subscription.edge_port.edge_port_member_speed
        else []
    )

    class ModifyEdgePortInterfaceForm(FormPage):
        model_config = ConfigDict(title="Modify Edge Port Interface")

        name: ReadOnlyField(subscription.edge_port.edge_port_name, default_type=str)  # type: ignore[valid-type]
        description: str | None = subscription.edge_port.edge_port_description or None
        ae_members: lag_ae_members = current_lag_ae_members

    interface_form_input = yield ModifyEdgePortInterfaceForm

    capacity_has_changed = (
        user_input.member_speed != subscription.edge_port.edge_port_member_speed
        or user_input.number_of_members != len(subscription.edge_port.edge_port_ae_members)
        or any(
            old_interface.interface_name
            not in [new_interface.interface_name for new_interface in interface_form_input.ae_members]
            for old_interface in subscription.edge_port.edge_port_ae_members
        )
        or len(subscription.edge_port.edge_port_ae_members) != len(interface_form_input.ae_members)
    )
    return user_input.model_dump() | interface_form_input.model_dump() | {"capacity_has_changed": capacity_has_changed}


@step("Modify edge port subscription.")
def modify_edge_port_subscription(
    subscription: EdgePort,
    member_speed: PhysicalPortCapacity,
    encapsulation: EncapsulationType,
    minimum_links: int,
    mac_address: str | None,
    geant_ga_id: str | None,
    enable_lacp: bool,  # noqa: FBT001
    ae_members: list[dict[str, str]],
    ignore_if_down: bool,  # noqa: FBT001
    description: str | None = None,
) -> State:
    """Modify the edge port subscription with the given parameters."""
    previous_ae_members = [
        {
            "interface_name": member.interface_name,
            "interface_description": member.interface_description,
        }
        for member in subscription.edge_port.edge_port_ae_members
    ]
    removed_ae_members = [member for member in previous_ae_members if member not in ae_members]
    subscription.edge_port.edge_port_enable_lacp = enable_lacp
    subscription.edge_port.edge_port_member_speed = member_speed
    subscription.edge_port.edge_port_encapsulation = encapsulation
    subscription.edge_port.edge_port_minimum_links = minimum_links
    subscription.edge_port.edge_port_mac_address = mac_address
    subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
    subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
    subscription.edge_port.edge_port_description = description
    subscription.description = (
        f"Edge Port {subscription.edge_port.edge_port_name} on"
        f" {subscription.edge_port.edge_port_node.router_fqdn},"
        f" {get_partner_by_id(subscription.customer_id).name}, {geant_ga_id or ""}"
    )
    subscription.edge_port.edge_port_ae_members.clear()
    for member in ae_members:
        subscription.edge_port.edge_port_ae_members.append(EdgePortAEMemberBlock.new(subscription_id=uuid4(), **member))

    return {
        "subscription": subscription,
        "removed_ae_members": removed_ae_members,
        "previous_ae_members": previous_ae_members,
    }


@step("Update interfaces in NetBox")
def update_interfaces_in_netbox(
    subscription: EdgePort, removed_ae_members: list[dict], previous_ae_members: list[dict]
) -> State:
    """Update the interfaces in NetBox."""
    nbclient = NetboxClient()
    # Free removed interfaces
    for removed_member in removed_ae_members:
        nbclient.free_interface(subscription.edge_port.edge_port_node.router_fqdn, removed_member["interface_name"])
    # Attach physical interfaces to :term:`LAG`
    # Update interface description to subscription ID
    # Reserve interfaces
    for member in subscription.edge_port.edge_port_ae_members:
        if any(prev_member["interface_name"] == member.interface_name for prev_member in previous_ae_members):
            continue
        nbclient.attach_interface_to_lag(
            device_name=subscription.edge_port.edge_port_node.router_fqdn,
            lag_name=subscription.edge_port.edge_port_name,
            iface_name=member.interface_name,
            description=str(subscription.subscription_id),
        )
        nbclient.reserve_interface(subscription.edge_port.edge_port_node.router_fqdn, member.interface_name)

    return {"subscription": subscription}


@step("[DRY RUN] Update edge port configuration.")
def update_edge_port_dry(
    subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[dict]
) -> LSOState:
    """Perform a dry run of updating the edge port configuration."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": True,
        "verb": "update",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} "
        f"- Update Edge Port {subscription["edge_port"]["edge_port_name"]}"
        f" on {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}",
        "removed_ae_members": removed_ae_members,
    }

    return {
        "playbook_name": "edge_ports.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}}},
        "extra_vars": extra_vars,
        "subscription": subscription,
    }


@step("[FOR REAL] Update edge port configuration.")
def update_edge_port_real(
    subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[str]
) -> LSOState:
    """Update the edge port configuration."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": False,
        "verb": "update",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} "
        f"- Update Edge Port {subscription["edge_port"]["edge_port_name"]}"
        f" on {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}",
        "removed_ae_members": removed_ae_members,
    }

    return {
        "subscription": subscription,
        "playbook_name": "edge_ports.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }


@step("Allocate/Deallocate interfaces in NetBox")
def allocate_interfaces_in_netbox(subscription: EdgePort, previous_ae_members: list[dict]) -> None:
    """Allocate the new interfaces in NetBox and detach the old ones from the :term:`LAG`."""
    nbclient = NetboxClient()
    for member in subscription.edge_port.edge_port_ae_members:
        if any(member.interface_name == prev_member["interface_name"] for prev_member in previous_ae_members):
            continue
        nbclient.allocate_interface(
            device_name=subscription.edge_port.edge_port_node.router_fqdn,
            iface_name=member.interface_name,
        )

    # detach the old interfaces from lag
    nbclient.detach_interfaces_from_lag(
        device_name=subscription.edge_port.edge_port_node.router_fqdn, lag_name=subscription.edge_port.edge_port_name
    )


@workflow(
    "Modify Edge Port",
    initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
    target=Target.MODIFY,
)
def modify_edge_port() -> StepList:
    """Modify a new edge port in the network.

    * Modify the subscription object in the service database
    * Modify configuration on the new edge port, first as a dry run
    * Change :term:`LAG` and :term:`LAG` members in the Netbox.
    """
    capacity_has_changed = conditional(lambda state: state["capacity_has_changed"])
    return (
        begin
        >> store_process_subscription(Target.MODIFY)
        >> unsync
        >> modify_edge_port_subscription
        >> capacity_has_changed(update_interfaces_in_netbox)
        >> capacity_has_changed(lso_interaction(update_edge_port_dry))
        >> capacity_has_changed(lso_interaction(update_edge_port_real))
        >> capacity_has_changed(allocate_interfaces_in_netbox)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
"""Terminate an edge port in the network."""

from typing import Any

from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import FormGenerator, UUIDstr

from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.types.tt_number import TTNumber


def initial_input_form_generator() -> FormGenerator:
    """Let the operator decide whether to delete configuration on the router, and clear up :term:`IPAM` resources."""

    class TerminateForm(FormPage):
        tt_number: TTNumber

    user_input = yield TerminateForm
    return user_input.model_dump()


@step("[DRY RUN] Remove Edge Port")
def remove_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> dict[str, Any]:
    """Remove an edge port from the network."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": True,
        "verb": "terminate",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
    }

    return {
        "subscription": subscription,
        "playbook_name": "edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }


@step("[FOR REAL] Remove Edge Port")
def remove_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
    """Remove an edge port from the network."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": False,
        "verb": "terminate",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
    }

    return {
        "subscription": subscription,
        "playbook_name": "edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }


@step("Netbox Clean Up")
def netbox_clean_up(subscription: EdgePort) -> None:
    """Update Netbox to remove the edge port :term:`LAG` interface and all the :term:`LAG` members."""
    nbclient = NetboxClient()

    for member in subscription.edge_port.edge_port_ae_members:
        nbclient.free_interface(subscription.edge_port.edge_port_node.router_fqdn, member.interface_name)

    nbclient.delete_interface(subscription.edge_port.edge_port_node.router_fqdn, subscription.edge_port.edge_port_name)


@workflow(
    "Terminate Edge Port",
    initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
    target=Target.TERMINATE,
)
def terminate_edge_port() -> StepList:
    """Terminate a new edge port in the network."""
    return (
        begin
        >> store_process_subscription(Target.TERMINATE)
        >> unsync
        >> lso_interaction(remove_edge_port_dry)
        >> lso_interaction(remove_edge_port_real)
        >> netbox_clean_up
        >> set_status(SubscriptionLifecycle.TERMINATED)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
"""Workflow for validating an existing Edge port subscription."""

from typing import Any

from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription
from orchestrator.workflows.utils import wrap_modify_initial_input_form

from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import LSOState, anonymous_lso_interaction
from gso.services.netbox_client import NetboxClient


@step("Prepare required keys in state")
def prepare_state(subscription_id: UUIDstr) -> State:
    """Add required keys to the state for the workflow to run successfully."""
    edge_port = EdgePort.from_subscription(subscription_id)

    return {"subscription": edge_port}


@step("Verify NetBox entries")
def verify_netbox_entries(subscription: EdgePort) -> None:
    """Validate required entries for an edge port in NetBox."""
    nbclient = NetboxClient()
    netbox_errors = []

    #  Raises en exception when not found.
    lag = nbclient.get_interface_by_name_and_device(
        subscription.edge_port.edge_port_name, subscription.edge_port.edge_port_node.router_fqdn
    )
    if lag.description != str(subscription.subscription_id):
        netbox_errors.append(
            f"Incorrect description for '{lag}', expected "
            f"'{subscription.subscription_id}' but got '{lag.description}'"
        )
    if not lag.enabled:
        netbox_errors.append(f"NetBox interface '{lag}' is not enabled.")
    for member in subscription.edge_port.edge_port_ae_members:
        interface = nbclient.get_interface_by_name_and_device(
            member.interface_name, subscription.edge_port.edge_port_node.router_fqdn
        )
        if interface.description != str(subscription.subscription_id):
            netbox_errors.append(
                f"Incorrect description for '{member.interface_name}', expected "
                f"'{subscription.subscription_id}' but got '{interface.description}'"
            )
        if not interface.enabled:
            netbox_errors.append(f"NetBox interface '{member.interface_name}' is not enabled.")

    if netbox_errors:
        raise ProcessFailureError(message="NetBox misconfiguration(s) found", details=str(netbox_errors))


@step("Check base config for drift")
def verify_base_config(subscription: dict[str, Any]) -> LSOState:
    """Workflow step for running a playbook that checks whether base config has drifted."""
    return {
        "playbook_name": "edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
        "extra_vars": {
            "dry_run": True,
            "subscription": subscription,
            "verb": "create",
            "is_verification_workflow": "true",
        },
    }


@workflow(
    "Validate Edge Port Configuration", target=Target.SYSTEM, initial_input_form=wrap_modify_initial_input_form(None)
)
def validate_edge_port() -> StepList:
    """Validate an existing, active Edge port subscription.

    * Check correct configuration of interfaces in NetBox.
    * Verify create Edge port configuration.
    """
    return (
        begin
        >> store_process_subscription(Target.SYSTEM)
        >> prepare_state
        >> verify_netbox_entries
        >> anonymous_lso_interaction(verify_base_config)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
@@ -255,7 +255,7 @@ def dig_all_hosts_v6(new_ipv6_network: str) -> None:

@step("Ping all hosts in the assigned IPv4 network")
def ping_all_hosts_v4(new_ipv4_network: str) -> None:
    """Ping all hosts in the IPv4 network to verify they're not in use."""
    """Ping all hosts in the IPv4 network to verify they are not in use."""
    unavailable_hosts = [host for host in IPv4Network(new_ipv4_network) if ping(str(host), timeout=1)]

    if unavailable_hosts:
@@ -265,7 +265,7 @@ def ping_all_hosts_v4(new_ipv4_network: str) -> None:

@step("Ping all hosts in the assigned IPv6 network")
def ping_all_hosts_v6(new_ipv6_network: str) -> State:
    """Ping all hosts in the IPv6 network to verify they're not in use."""
    """Ping all hosts in the IPv6 network to verify they are not in use."""
    unavailable_hosts = [host for host in IPv6Network(new_ipv6_network) if ping(str(host), timeout=1)]

    if unavailable_hosts:
Original line number Diff line number Diff line
@@ -89,7 +89,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
            ):
                #  We want to stay on the same site, so all routers that are in different sites get skipped.
                continue
            #  If migrate_to_different_site is true, we can add ALL routers to the result map
            #  If migrate_to_different_site is true, we can add *all* routers to the result map
            routers[str(router_id)] = router["description"]

    new_router_enum = Choice("Select a new router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]
@@ -204,7 +204,7 @@ def calculate_old_side_data(subscription: Iptrunk, replace_index: int) -> State:

@step("Check Optical PRE levels on the trunk endpoint")
def check_ip_trunk_optical_levels_pre(subscription: Iptrunk) -> LSOState:
    """Check Optical PRE levels on the trunk."""
    """Check Optical levels on the trunk before migration."""
    extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "optical_pre"}

    return {
@@ -252,7 +252,7 @@ def check_ip_trunk_optical_levels_post(
def check_ip_trunk_lldp(
    subscription: Iptrunk, new_node: Router, new_lag_member_interfaces: list[dict], replace_index: int
) -> LSOState:
    """Check LLDP on the new trunk endpoints."""
    """Check :term:`LLDP` on the new trunk endpoints."""
    extra_vars = {
        "wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
        "new_node": json.loads(json_dumps(new_node)),
@@ -489,7 +489,7 @@ def update_remaining_side_bfd_real(

@step("Check BFD session over trunk")
def check_ip_trunk_bfd(subscription: Iptrunk, new_node: Router, replace_index: int) -> LSOState:
    """Check BFD session across the new trunk."""
    """Check :term:`BFD` session across the new trunk."""
    extra_vars = {
        "wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
        "new_node": json.loads(json_dumps(new_node)),
@@ -830,7 +830,7 @@ def migrate_iptrunk() -> StepList:
    * Deploy a new :term:`ISIS` interface between routers A and C
    * Wait for operator confirmation that :term:`ISIS` is behaving as expected
    * Restore the old :term:`ISIS` metric on the new trunk
    * Delete the old, disabled configuration on the routers, first as a dry run
    * Delete the old configuration from the routers, first as a dry run
    * Reflect the changes made in :term:`IPAM`
    * Update the subscription model in the database
    * Update the reserved interfaces in Netbox
Original line number Diff line number Diff line
@@ -205,7 +205,7 @@ def check_ip_trunk_connectivity(subscription: Iptrunk) -> LSOState:

@step("Check LLDP on the trunk endpoints")
def check_ip_trunk_lldp(subscription: Iptrunk) -> LSOState:
    """Check LLDP on trunk endpoints."""
    """Check :term:`LLDP` on trunk endpoints."""
    extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "lldp"}

    return {
Original line number Diff line number Diff line
@@ -210,7 +210,7 @@ def validate_iptrunk() -> StepList:
    * Verify that the :term:`LAG` interfaces are correctly configured in :term:`IPAM`.
    * Check correct configuration of interfaces in NetBox.
    * Verify the configuration on both sides of the trunk is intact.
    * Check the ISIS metric of the trunk.
    * Check the :term:`ISIS` metric of the trunk.
    * Verify that TWAMP configuration is correct.

    If a trunk has a Juniper router on both sides, it is considered legacy and does not require validation.
Original line number Diff line number Diff line
"""A creation workflow for adding an existing GEANT IP to the service database."""

from uuid import uuid4

from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from pydantic import BaseModel
from pydantic_forms.types import UUIDstr

from gso.products import ProductName
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.nren_l3_core_service import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPortInactive
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import ImportedGeantIPInactive
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.shared_enums import SBPType
from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPV4Netmask, IPv6AddressType, IPV6Netmask


def initial_input_form_generator() -> FormGenerator:
    """Take all information passed to this workflow by the :term:`API` endpoint that was called."""

    class BaseBGPPeer(BaseModel):
        bfd_enabled: bool = False
        bfd_interval: int | None = None
        bfd_multiplier: int | None = None
        has_custom_policies: bool = False
        authentication_key: str
        multipath_enabled: bool = False
        send_default_route: bool = False
        is_passive: bool = False
        peer_address: IPAddress
        families: list[IPFamily]
        is_multi_hop: bool
        rtbh_enabled: bool

    class ServiceBindingPort(BaseModel):
        edge_port: UUIDstr
        ap_type: str
        geant_sid: str
        sbp_type: SBPType = SBPType.L3
        is_tagged: bool = False
        vlan_id: VLAN_ID
        custom_firewall_filters: bool = False
        ipv4_address: IPv4AddressType
        ipv4_mask: IPV4Netmask
        ipv6_address: IPv6AddressType
        ipv6_mask: IPV6Netmask
        rtbh_enabled: bool = True
        is_multi_hop: bool = True
        bgp_peers: list[BaseBGPPeer]

    class ImportGeantIPForm(FormPage):
        partner: str
        service_binding_ports: list[ServiceBindingPort]

    user_input = yield ImportGeantIPForm

    return user_input.model_dump()


@step("Create subscription")
def create_subscription(partner: str) -> dict:
    """Create a new subscription object in the database."""
    partner_id = get_partner_by_name(partner)["partner_id"]
    product_id = get_product_id_by_name(ProductName.IMPORTED_GEANT_IP)
    subscription = ImportedGeantIPInactive.from_product_id(product_id, partner_id)
    return {"subscription": subscription, "subscription_id": subscription.subscription_id}


@step("Initialize subscription")
def initialize_subscription(subscription: ImportedGeantIPInactive, service_binding_ports: list) -> dict:
    """Initialize the subscription with the user input."""
    for service_binding_port in service_binding_ports:
        edge_port_subscription = EdgePort.from_subscription(service_binding_port.pop("edge_port"))
        bgp_peers = service_binding_port.pop("bgp_peers")
        sbp_bgp_session_list = [BGPSession.new(subscription_id=uuid4(), **session) for session in bgp_peers]

        service_binding_port_subscription = ServiceBindingPortInactive.new(
            subscription_id=uuid4(),
            edge_port=edge_port_subscription.edge_port,
            sbp_bgp_session_list=sbp_bgp_session_list,
            **service_binding_port,
        )
        subscription.geant_ip.geant_ip_ap_list.append(
            NRENAccessPortInactive.new(
                subscription_id=uuid4(),
                nren_ap_type=service_binding_port["ap_type"],
                geant_ip_sbp=service_binding_port_subscription,
            )
        )

    subscription.description = "GEANT IP service"

    return {"subscription": subscription}


@workflow(
    "Import GÉANT IP",
    initial_input_form=initial_input_form_generator,
    target=Target.CREATE,
)
def create_imported_geant_ip() -> StepList:
    """Import a GÉANT IP without provisioning it."""
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
"""Create a new GÉANT IP subscription."""

from typing import Annotated, Any
from uuid import uuid4

from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, computed_field
from pydantic_forms.validators import Divider

from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.nren_l3_core_service import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPortInactive
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import GeantIPInactive
from gso.services.lso_client import LSOState, lso_interaction
from gso.utils.helpers import (
    active_edge_port_selector,
    partner_choice,
)
from gso.utils.shared_enums import APType, SBPType
from gso.utils.types.ip_address import IPv4AddressType, IPV4Netmask, IPv6AddressType, IPV6Netmask
from gso.utils.types.tt_number import TTNumber


def initial_input_form_generator(product_name: str) -> FormGenerator:
    """Gather input from the operator to build a new subscription object."""

    class CreateGeantIPForm(FormPage):
        model_config = ConfigDict(title="GÉANT IP - Select partner")

        tt_number: TTNumber
        partner: partner_choice()  # type: ignore[valid-type]

    initial_user_input = yield CreateGeantIPForm

    class EdgePortSelection(BaseModel):
        edge_port: active_edge_port_selector(partner_id=initial_user_input.partner)  # type: ignore[valid-type]
        ap_type: APType

    def validate_edge_ports_are_unique(edge_ports: list[EdgePortSelection]) -> list[EdgePortSelection]:
        """Verify if interfaces are unique."""
        port_names = [port.edge_port for port in edge_ports]
        if len(port_names) != len(set(port_names)):
            msg = "Edge Ports must be unique."
            raise ValueError(msg)
        return edge_ports

    class EdgePortSelectionForm(FormPage):
        model_config = ConfigDict(title="GÉANT IP - Select Edge Ports")
        info_label: Label = Field(
            "Please select the Edge Ports where this GÉANT IP service will terminate", exclude=True
        )

        edge_ports: Annotated[list[EdgePortSelection], AfterValidator(validate_edge_ports_are_unique)]

    selected_edge_ports = yield EdgePortSelectionForm
    ep_list = selected_edge_ports.edge_ports

    class BaseBGPPeer(BaseModel):
        bfd_enabled: bool = False
        bfd_interval: int | None = None
        bfd_multiplier: int | None = None
        has_custom_policies: bool = False
        authentication_key: str
        multipath_enabled: bool = False
        send_default_route: bool = False
        is_passive: bool = False

    class IPv4BGPPeer(BaseBGPPeer):
        peer_address: IPv4AddressType
        add_v4_multicast: bool = Field(default=False, exclude=True)

        @computed_field  # type: ignore[misc]
        @property
        def families(self) -> list[IPFamily]:
            return [IPFamily.V4UNICAST, IPFamily.V4MULTICAST] if self.add_v4_multicast else [IPFamily.V4UNICAST]

    class IPv6BGPPeer(BaseBGPPeer):
        peer_address: IPv6AddressType
        add_v6_multicast: bool = Field(default=False, exclude=True)

        @computed_field  # type: ignore[misc]
        @property
        def families(self) -> list[IPFamily]:
            return [IPFamily.V6UNICAST, IPFamily.V6MULTICAST] if self.add_v6_multicast else [IPFamily.V6UNICAST]

    binding_port_inputs = []
    for ep_index, edge_port in enumerate(ep_list):

        class BindingPortsInputForm(FormPage):
            model_config = ConfigDict(title=f"GÉANT IP - Configure Edge Ports ({ep_index + 1}/{len(ep_list)})")
            info_label: Label = Field("Please configure the Service Binding Ports for each Edge Port.", exclude=True)
            current_ep_label: Label = Field(
                f"Currently configuring on {EdgePort.from_subscription(edge_port.edge_port).description} "
                f"(Access Port type: {edge_port.ap_type})",
                exclude=True,
            )

            geant_sid: str
            is_tagged: bool = False
            vlan_id: VLAN_ID
            ipv4_address: IPv4AddressType
            ipv4_mask: IPV4Netmask
            ipv6_address: IPv6AddressType
            ipv6_mask: IPV6Netmask
            custom_firewall_filters: bool = False
            divider: Divider = Field(None, exclude=True)
            v4_bgp_peer: IPv4BGPPeer
            v6_bgp_peer: IPv6BGPPeer

        binding_port_input_form = yield BindingPortsInputForm
        binding_port_inputs.append(
            binding_port_input_form.model_dump()
            | {
                "bgp_peers": [
                    binding_port_input_form.v4_bgp_peer.model_dump(),
                    binding_port_input_form.v6_bgp_peer.model_dump(),
                ]
            }
        )

    return (
        initial_user_input.model_dump()
        | selected_edge_ports.model_dump()
        | {"binding_port_inputs": binding_port_inputs, "product_name": product_name}
    )


@step("Create subscription")
def create_subscription(product: UUIDstr, partner: str) -> State:
    """Create a new subscription object in the database."""
    subscription = GeantIPInactive.from_product_id(product, partner)

    return {"subscription": subscription, "subscription_id": subscription.subscription_id}


@step("Initialize subscription")
def initialize_subscription(
    subscription: GeantIPInactive, edge_ports: list[dict], binding_port_inputs: list[dict]
) -> State:
    """Take all user inputs and use them to populate the subscription model."""
    edge_port_fqdn_list = []
    for edge_port_input, sbp_input in zip(edge_ports, binding_port_inputs, strict=False):
        edge_port_subscription = EdgePort.from_subscription(edge_port_input["edge_port"])
        sbp_bgp_session_list = [
            BGPSession.new(subscription_id=uuid4(), **session, rtbh_enabled=True, is_multi_hop=True)
            for session in sbp_input["bgp_peers"]
        ]
        service_binding_port = ServiceBindingPortInactive.new(
            subscription_id=uuid4(),
            **sbp_input,
            sbp_bgp_session_list=sbp_bgp_session_list,
            sbp_type=SBPType.L3,
            edge_port=edge_port_subscription.edge_port,
        )
        subscription.geant_ip.geant_ip_ap_list.append(
            NRENAccessPortInactive.new(
                subscription_id=uuid4(),
                nren_ap_type=edge_port_input["ap_type"],
                geant_ip_sbp=service_binding_port,
            )
        )
        edge_port_fqdn_list.append(edge_port_subscription.edge_port.edge_port_node.router_fqdn)

    subscription.description = "GEANT IP service"

    return {"subscription": subscription, "edge_port_fqdn_list": edge_port_fqdn_list}


@step("[DRY RUN] Deploy service binding port")
def provision_sbp_dry(
    subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str]
) -> LSOState:
    """Perform a dry run of deploying Service Binding Ports."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": True,
        "verb": "deploy",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploy config for {subscription["description"]}",
    }

    return {
        "playbook_name": "manage_sbp.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }


@step("[FOR REAL] Deploy service binding port")
def provision_sbp_real(
    subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str]
) -> LSOState:
    """Deploy Service Binding Ports."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": False,
        "verb": "deploy",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploy config for {subscription["description"]}",
    }

    return {
        "playbook_name": "manage_sbp.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }


@step("Check service binding port functionality")
def check_sbp_functionality(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
    """Check functionality of deployed Service Binding Ports."""
    extra_vars = {"subscription": subscription, "verb": "check"}

    return {
        "playbook_name": "manage_sbp.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }


@step("[DRY RUN] Deploy BGP peers")
def deploy_bgp_peers_dry(
    subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr
) -> LSOState:
    """Perform a dry run of deploying :term:`BGP` peers."""
    extra_vars = {
        "subscription": subscription,
        "verb": "deploy",
        "dry_run": True,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploying BGP peers for {subscription["description"]}",
    }

    return {
        "playbook_name": "manage_sbp.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }


@step("[FOR REAL] Deploy BGP peers")
def deploy_bgp_peers_real(
    subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr
) -> LSOState:
    """Deploy :term:`BGP` peers."""
    extra_vars = {
        "subscription": subscription,
        "verb": "deploy",
        "dry_run": False,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploying BGP peers for {subscription["description"]}",
    }

    return {
        "playbook_name": "manage_sbp.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }


@step("Check BGP peers")
def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
    """Check correct deployment of :term:`BGP` peers."""
    extra_vars = {"subscription": subscription, "verb": "check"}

    return {
        "playbook_name": "manage_sbp.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }


@step("Update Infoblox")
def update_dns_records(subscription: GeantIPInactive) -> State:
    """Update :term:`DNS` records in Infoblox."""
    #  TODO: implement
    return {"subscription": subscription}


@workflow(
    "Create GÉANT IP",
    initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
    target=Target.CREATE,
)
def create_geant_ip() -> StepList:
    """Create a new GÉANT IP subscription.

    * Create subscription object in the service database
    * Deploy service binding ports
    * Deploy :term:`BGP` peers
    * Update :term:`DNS` records
    * Set the subscription in a provisioning state in the database
    """
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> lso_interaction(provision_sbp_dry)
        >> lso_interaction(provision_sbp_real)
        >> lso_interaction(check_sbp_functionality)
        >> lso_interaction(deploy_bgp_peers_dry)
        >> lso_interaction(deploy_bgp_peers_real)
        >> lso_interaction(check_bgp_peers)
        >> update_dns_records
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
"""A modification workflow for migrating an ImportedGeantIP to an GeantIP subscription."""

from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form

from gso.products import ProductName
from gso.products.product_types.nren_l3_core_service import GeantIP, ImportedGeantIP
from gso.services.subscriptions import get_product_id_by_name


@step("Create new IP trunk subscription")
def import_geant_ip_subscription(subscription_id: UUIDstr) -> State:
    """Take an ImportedGeantIP subscription, and turn it into an GeantIP subscription."""
    old_geant_ip = ImportedGeantIP.from_subscription(subscription_id)
    new_subscription_id = get_product_id_by_name(ProductName.GEANT_IP)
    new_subscription = GeantIP.from_other_product(old_geant_ip, new_subscription_id)  # type: ignore[arg-type]

    return {"subscription": new_subscription}


@workflow("Import GÉANT IP", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_geant_ip() -> StepList:
    """Modify an ImportedGeantIP subscription into an GeantIP subscription to complete the import."""
    return init >> store_process_subscription(Target.MODIFY) >> unsync >> import_geant_ip_subscription >> resync >> done
Original line number Diff line number Diff line
"""A modification workflow that migrates a GÉANT IP subscription to a different set of Edge Ports."""

from typing import Annotated

from annotated_types import Len
from orchestrator import workflow
from orchestrator.targets import Target
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field
from pydantic_forms.core import FormPage
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import Choice, Divider

from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import GeantIP
from gso.services.subscriptions import get_active_edge_port_subscriptions
from gso.utils.types.tt_number import TTNumber


def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Gather input from the operator on what new Edge Ports this GÉANT IP should migrate to."""
    subscription = GeantIP.from_subscription(subscription_id)
    partner_id = subscription.customer_id
    edge_port_count = len(subscription.geant_ip.geant_ip_ap_list)

    def _new_edge_port_selector(pid: UUIDstr) -> Choice:
        existing_ep_name_list = [
            ap.geant_ip_sbp.edge_port.owner_subscription_id for ap in subscription.geant_ip.geant_ip_ap_list
        ]
        edge_port_subscriptions = list(
            filter(
                lambda ep: bool(ep["customer_id"] == pid) and ep["subscription_id"] not in existing_ep_name_list,
                get_active_edge_port_subscriptions(includes=["subscription_id", "description", "customer_id"]),
            )
        )

        edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions}

        return Choice(
            "Select an Edge Port",
            zip(edge_ports.keys(), edge_ports.items(), strict=True),  # type: ignore[arg-type]
        )

    class NewEdgePortSelection(BaseModel):
        old_edge_port: str
        new_edge_port: _new_edge_port_selector(partner_id) | str  # type: ignore[valid-type]

    def _validate_new_edge_ports_are_unique(edge_ports: list[NewEdgePortSelection]) -> list[NewEdgePortSelection]:
        new_edge_ports = [str(port.new_edge_port) for port in edge_ports]
        if len(new_edge_ports) != len(set(new_edge_ports)):
            msg = "New Edge Ports must be unique"
            raise ValueError(msg)
        return edge_ports

    class GeantIPEdgePortSelectionForm(FormPage):
        model_config = ConfigDict(title="Migrating GÉANT IP to a new set of Edge Ports")

        tt_number: TTNumber
        divider: Divider = Field(None, exclude=True)
        edge_port_selection: Annotated[
            list[NewEdgePortSelection],
            AfterValidator(_validate_new_edge_ports_are_unique),
            Len(min_length=edge_port_count, max_length=edge_port_count),
        ] = [  # noqa: RUF012
            NewEdgePortSelection(
                old_edge_port=f"{
                    EdgePort.from_subscription(ap.geant_ip_sbp.edge_port.owner_subscription_id).description
                } ({ap.nren_ap_type})",
                new_edge_port="",
            )
            for ap in subscription.geant_ip.geant_ip_ap_list
        ]

    ep_user_input = yield GeantIPEdgePortSelectionForm

    return {"subscription_id": subscription_id, "subscription": subscription} | ep_user_input.model_dump()


@step("Update subscription model")
def update_subscription_model(subscription: GeantIP, edge_port_selection: list[dict]) -> State:
    """Update the subscription model with the new list of Access Ports."""
    for index, selected_port in enumerate(edge_port_selection):
        subscription.geant_ip.geant_ip_ap_list[index].geant_ip_sbp.edge_port = EdgePort.from_subscription(
            selected_port["new_edge_port"]
        ).edge_port

    return {"subscription": subscription}


@workflow(
    "Migrate GÉANT IP",
    initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
    target=Target.MODIFY,
)
def migrate_geant_ip() -> StepList:
    """Migrate a GÉANT IP to a new set of Edge Ports."""
    return begin >> store_process_subscription(Target.MODIFY) >> unsync >> update_subscription_model >> resync >> done
Original line number Diff line number Diff line
"""A modification workflow for a GÉANT IP subscription."""

from typing import Annotated, Any
from uuid import uuid4

from orchestrator import begin, conditional, done, step, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.workflow import StepList
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, computed_field
from pydantic_forms.types import State
from pydantic_forms.validators import Divider, Label

from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.nren_l3_core_service import NRENAccessPort
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import GeantIP
from gso.utils.helpers import active_edge_port_selector
from gso.utils.shared_enums import APType, SBPType
from gso.utils.types.ip_address import IPv4AddressType, IPV4Netmask, IPv6AddressType, IPV6Netmask


def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Get input about added, removed, and modified Access Ports."""
    subscription = GeantIP.from_subscription(subscription_id)

    class AccessPortSelection(BaseModel):
        geant_ip_ep: active_edge_port_selector(partner_id=subscription.customer_id) | str  # type: ignore[valid-type]
        nren_ap_type: APType

    def validate_edge_ports_are_unique(access_ports: list[AccessPortSelection]) -> list[AccessPortSelection]:
        """Verify if interfaces are unique."""
        edge_ports = [str(port.geant_ip_ep) for port in access_ports]
        if len(edge_ports) != len(set(edge_ports)):
            msg = "Edge Ports must be unique."
            raise ValueError(msg)
        return access_ports

    class ModifyGeantIPAccessPortsForm(FormPage):
        model_config = ConfigDict(title="Modify GÉANT IP")
        access_ports: Annotated[list[AccessPortSelection], AfterValidator(validate_edge_ports_are_unique)] = [  # noqa: RUF012
            AccessPortSelection(
                geant_ip_ep=str(access_port.geant_ip_sbp.edge_port.owner_subscription_id),
                nren_ap_type=access_port.nren_ap_type,
            )
            for access_port in subscription.geant_ip.geant_ip_ap_list
        ]

    access_port_input = yield ModifyGeantIPAccessPortsForm
    input_ap_list = access_port_input.access_ports
    input_ep_list = [str(ap.geant_ip_ep) for ap in input_ap_list]
    existing_ep_list = [
        str(ap.geant_ip_sbp.edge_port.owner_subscription_id) for ap in subscription.geant_ip.geant_ip_ap_list
    ]

    class BaseBGPPeer(BaseModel):
        bfd_enabled: bool = False
        bfd_interval: int | None = None
        bfd_multiplier: int | None = None
        has_custom_policies: bool = False
        authentication_key: str
        multipath_enabled: bool = False
        send_default_route: bool = False
        is_passive: bool = False

    class IPv4BGPPeer(BaseBGPPeer):
        peer_address: IPv4AddressType
        add_v4_multicast: bool = Field(default=False, exclude=True)

        @computed_field  # type: ignore[misc]
        @property
        def families(self) -> list[IPFamily]:
            return [IPFamily.V4UNICAST, IPFamily.V4MULTICAST] if self.add_v4_multicast else [IPFamily.V4UNICAST]

    class IPv6BGPPeer(BaseBGPPeer):
        peer_address: IPv6AddressType
        add_v6_multicast: bool = Field(default=False, exclude=True)

        @computed_field  # type: ignore[misc]
        @property
        def families(self) -> list[IPFamily]:
            return [IPFamily.V6UNICAST, IPFamily.V6MULTICAST] if self.add_v6_multicast else [IPFamily.V6UNICAST]

    #  There are three possible scenarios for Edge Ports. They can be added, removed, or their relevant SBP can be
    #  modified.
    removed_ap_list = [
        access_port.subscription_instance_id
        for access_port in subscription.geant_ip.geant_ip_ap_list
        if str(access_port.geant_ip_sbp.edge_port.owner_subscription_id) not in input_ep_list
    ]
    modified_ap_list = [
        (
            access_port,
            next(
                (
                    ap.nren_ap_type
                    for ap in input_ap_list
                    if str(ap.geant_ip_ep) == str(access_port.geant_ip_sbp.edge_port.owner_subscription_id)
                ),
                None,
            ),
        )
        for access_port in subscription.geant_ip.geant_ip_ap_list
        if str(access_port.geant_ip_sbp.edge_port.owner_subscription_id) in input_ep_list
    ]
    added_ap_list = [
        (ep, next(ap.nren_ap_type for ap in input_ap_list if str(ap.geant_ip_ep) == ep))
        for ep in input_ep_list
        if ep not in existing_ep_list
    ]

    #  First, the user can modify existing Edge Ports
    sbp_inputs = []
    for access_port_index, ap_entry in enumerate(modified_ap_list):
        access_port, new_ap_type = ap_entry
        current_sbp = access_port.geant_ip_sbp
        v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families)
        v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families)

        class BindingPortModificationForm(FormPage):
            model_config = ConfigDict(
                title=f"GÉANT IP - Modify Edge Port configuration ({access_port_index + 1}/{len(input_ap_list)})"
            )
            current_ep_label: Label = Field(
                f"Currently configuring on {access_port.geant_ip_sbp.edge_port.description} "
                f"(Access Port type: {access_port.nren_ap_type})",
                exclude=True,
            )

            geant_sid: str = current_sbp.geant_sid
            is_tagged: bool = current_sbp.is_tagged
            # The SBP model doesn't require these three fields, but in the case of GÉANT IP this will never occur since
            # it's a layer 3 service. The ignore statements are there to put our type checker at ease.
            vlan_id: VLAN_ID = current_sbp.vlan_id  # type: ignore[assignment]
            ipv4_address: IPv4AddressType = current_sbp.ipv4_address  # type: ignore[assignment]
            ipv4_mask: IPV4Netmask = current_sbp.ipv4_mask  # type: ignore[assignment]
            ipv6_address: IPv6AddressType = current_sbp.ipv6_address  # type: ignore[assignment]
            ipv6_mask: IPV6Netmask = current_sbp.ipv6_mask  # type: ignore[assignment]
            custom_firewall_filters: bool = current_sbp.custom_firewall_filters
            divider: Divider = Field(None, exclude=True)
            v4_bgp_peer: IPv4BGPPeer = IPv4BGPPeer(
                **v4_peer.model_dump(exclude=set("families")),
                add_v4_multicast=bool(IPFamily.V4MULTICAST in v4_peer.families),
            )
            v6_bgp_peer: IPv6BGPPeer = IPv6BGPPeer(
                **v6_peer.model_dump(exclude=set("families")),
                add_v6_multicast=bool(IPFamily.V6MULTICAST in v6_peer.families),
            )

        binding_port_input_form = yield BindingPortModificationForm
        sbp_inputs.append(
            binding_port_input_form.model_dump()
            | {
                "new_ap_type": new_ap_type,
                "current_sbp_id": current_sbp.subscription_instance_id,
            }
        )

    #  Second, newly added Edge Ports are configured
    binding_port_inputs = []
    for ap_index, access_port_tuple in enumerate(added_ap_list):
        edge_port_id, ap_type = access_port_tuple

        class BindingPortInputForm(FormPage):
            model_config = ConfigDict(
                title=f"GÉANT IP - Configure new Edge Port "
                f"({len(modified_ap_list) + ap_index + 1}/{len(input_ap_list)})"
            )
            info_label: Label = Field(
                "Please configure the Service Binding Ports for each newly added Edge Port", exclude=True
            )
            current_ep_label: Label = Field(
                f"Currently configuring on {EdgePort.from_subscription(edge_port_id).description} "
                f"(Access Port type: {ap_type})",
                exclude=True,
            )

            geant_sid: str
            is_tagged: bool = False
            vlan_id: VLAN_ID
            ipv4_address: IPv4AddressType
            ipv6_address: IPv6AddressType
            custom_firewall_filters: bool = False
            divider: Divider = Field(None, exclude=True)
            v4_bgp_peer: IPv4BGPPeer
            v6_bgp_peer: IPv6BGPPeer

        binding_port_input_form = yield BindingPortInputForm
        binding_port_inputs.append(
            binding_port_input_form.model_dump()
            | {
                "bgp_peers": [
                    binding_port_input_form.v4_bgp_peer.model_dump(),
                    binding_port_input_form.v6_bgp_peer.model_dump(),
                ],
                "edge_port_id": edge_port_id,
                "ap_type": ap_type,
            }
        )

    return access_port_input.model_dump() | {
        "added_service_binding_ports": binding_port_inputs,
        "removed_access_ports": removed_ap_list,
        "modified_sbp_list": sbp_inputs,
    }


@step("Clean up removed Edge Ports")
def remove_old_sbp_blocks(subscription: GeantIP, removed_access_ports: list[UUIDstr]) -> State:
    """Remove old :term:`SBP` product blocks from the GÉANT IP subscription."""
    subscription.geant_ip.geant_ip_ap_list = [
        ap
        for ap in subscription.geant_ip.geant_ip_ap_list
        if str(ap.subscription_instance_id) not in removed_access_ports
    ]

    return {"subscription": subscription}


@step("Modify existing Service Binding Ports")
def modify_existing_sbp_blocks(subscription: GeantIP, modified_sbp_list: list[dict[str, Any]]) -> State:
    """Update the subscription model."""
    for access_port in subscription.geant_ip.geant_ip_ap_list:
        current_sbp = access_port.geant_ip_sbp
        modified_sbp_data = next(
            sbp for sbp in modified_sbp_list if sbp["current_sbp_id"] == str(current_sbp.subscription_instance_id)
        )

        v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families)
        for attribute in modified_sbp_data["v4_bgp_peer"]:
            setattr(v4_peer, attribute, modified_sbp_data["v4_bgp_peer"][attribute])

        v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families)
        for attribute in modified_sbp_data["v6_bgp_peer"]:
            setattr(v6_peer, attribute, modified_sbp_data["v6_bgp_peer"][attribute])

        current_sbp.sbp_bgp_session_list = [v4_peer, v6_peer]
        current_sbp.vlan_id = modified_sbp_data["vlan_id"]
        current_sbp.geant_sid = modified_sbp_data["geant_sid"]
        current_sbp.is_tagged = modified_sbp_data["is_tagged"]
        current_sbp.ipv4_address = modified_sbp_data["ipv4_address"]
        current_sbp.ipv6_address = modified_sbp_data["ipv6_address"]
        current_sbp.custom_firewall_filters = modified_sbp_data["custom_firewall_filters"]
        access_port.nren_ap_type = modified_sbp_data["new_ap_type"]

    return {"subscription": subscription}


@step("Instantiate new Service Binding Ports")
def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: list[dict[str, Any]]) -> State:
    """Add new :term:`SBP`s to the GÉANT IP subscription."""
    for sbp_input in added_service_binding_ports:
        edge_port = EdgePort.from_subscription(sbp_input["edge_port_id"])
        sbp_bgp_session_list = [
            BGPSession.new(subscription_id=uuid4(), **session, rtbh_enabled=True, is_multi_hop=True)
            for session in sbp_input["bgp_peers"]
        ]
        service_binding_port = ServiceBindingPort.new(
            subscription_id=uuid4(),
            **sbp_input,
            sbp_bgp_session_list=sbp_bgp_session_list,
            sbp_type=SBPType.L3,
            edge_port=edge_port.edge_port,
        )
        subscription.geant_ip.geant_ip_ap_list.append(
            NRENAccessPort.new(
                subscription_id=uuid4(),
                nren_ap_type=sbp_input["ap_type"],
                geant_ip_sbp=service_binding_port,
            )
        )

    return {"subscription": subscription}


@workflow(
    "Modify GÉANT IP",
    initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
    target=Target.MODIFY,
)
def modify_geant_ip() -> StepList:
    """Modify a GÉANT IP subscription."""
    access_ports_are_removed = conditional(lambda state: bool(len(state["removed_access_ports"]) > 0))
    access_ports_are_modified = conditional(lambda state: bool(len(state["modified_sbp_list"]) > 0))
    access_ports_are_added = conditional(lambda state: bool(len(state["added_service_binding_ports"]) > 0))

    return (
        begin
        >> store_process_subscription(Target.MODIFY)
        >> unsync
        >> access_ports_are_removed(remove_old_sbp_blocks)
        >> access_ports_are_modified(modify_existing_sbp_blocks)
        >> access_ports_are_added(create_new_sbp_blocks)
        >> resync
        >> done
    )
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ from typing import Self

from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, Label
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
@@ -17,13 +17,13 @@ from pydantic_forms.validators import ReadOnlyField
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import RouterInactive, RouterProvisioning
from gso.products.product_types.site import Site
from gso.services import infoblox, subscriptions
from gso.services import infoblox
from gso.services.lso_client import lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_name
from gso.services.sharepoint import SharePointClient
from gso.settings import load_oss_params
from gso.utils.helpers import generate_fqdn, iso_from_ipv4
from gso.utils.helpers import active_site_selector, generate_fqdn, iso_from_ipv4
from gso.utils.shared_enums import Vendor
from gso.utils.types.ip_address import PortNumber
from gso.utils.types.tt_number import TTNumber
@@ -35,15 +35,6 @@ from gso.utils.workflow_steps import (
)


def _site_selector() -> Choice:
    site_subscriptions = {}
    for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"]):
        site_subscriptions[str(site["subscription_id"])] = site["description"]

    # noinspection PyTypeChecker
    return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True))  # type: ignore[arg-type]


def initial_input_form_generator(product_name: str) -> FormGenerator:
    """Gather information about the new router from the operator."""

@@ -53,7 +44,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
        tt_number: TTNumber
        partner: ReadOnlyField("GEANT", default_type=str)  # type: ignore[valid-type]
        vendor: Vendor
        router_site: _site_selector()  # type: ignore[valid-type]
        router_site: active_site_selector()  # type: ignore[valid-type]
        hostname: str
        ts_port: PortNumber
        router_role: RouterRole
Original line number Diff line number Diff line
@@ -34,8 +34,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
def update_subscription_model(subscription: Router, connection_strategy: str) -> State:
    """Update the database model to reflect the new connection strategy.

    If the connection strategy is set to IN-BAND, then access_via_ts should be set to False.
    Conversely, if the connection strategy is set to OUT-OF-BAND, access_via_ts should be set to True.
    If the connection strategy is set to in-band, then access_via_ts should be set to False.
    Conversely, if the connection strategy is set to out-of-band, access_via_ts should be set to True.
    """
    subscription.router.router_access_via_ts = connection_strategy == ConnectionStrategy.OUT_OF_BAND

Original line number Diff line number Diff line
@@ -236,7 +236,7 @@ def deploy_routing_instances_real(subscription: dict[str, Any], tt_number: str,

@step("Remove ISIS overload")
def remove_isis_overload(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
    """Remove ISIS overload."""
    """Remove :term:`ISIS` overload."""
    extra_vars = {
        "dry_run": False,
        "subscription": subscription,
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ from gso.services import librenms_client
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.subscriptions import get_trunks_that_terminate_on_router
from gso.utils.helpers import generate_inventory_for_active_routers
from gso.utils.types.snmp import SNMPVersion
from gso.utils.shared_enums import SNMPVersion
from gso.utils.types.tt_number import TTNumber
from gso.utils.workflow_steps import (
    add_all_p_to_pe_dry,
Original line number Diff line number Diff line
@@ -7,23 +7,18 @@ from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from pydantic_forms.validators import Choice

from gso.services.partners import delete_partner, get_all_partners, get_partner_by_name
from gso.services.partners import delete_partner, get_partner_by_name
from gso.services.subscriptions import get_subscriptions
from gso.utils.helpers import partner_choice


def initial_input_form_generator() -> FormGenerator:
    """Gather input from the user needed for deleting a partner."""
    partners = {}
    for partner in get_all_partners():
        partners[partner["partner_id"]] = partner["name"]

    partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True))  # type: ignore[arg-type]

    class SelectPartnerForm(FormPage):
        model_config = ConfigDict(title="Delete a Partner")
        partners: partner_choice  # type: ignore[valid-type]
        partners: partner_choice()  # type: ignore[valid-type]

        @field_validator("partners")
        def validate_partners(cls, value: Enum) -> Enum:
Original line number Diff line number Diff line
@@ -5,30 +5,24 @@ from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from pydantic_forms.validators import Choice

from gso.services.partners import (
    ModifiedPartnerSchema,
    edit_partner,
    filter_partners_by_email,
    filter_partners_by_name,
    get_all_partners,
    get_partner_by_name,
)
from gso.utils.helpers import partner_choice


def initial_input_form_generator() -> FormGenerator:
    """Gather input from the user needed for modifying a partner."""
    partners = {}
    for partner in get_all_partners():
        partners[partner["partner_id"]] = partner["name"]

    partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True))  # type: ignore[arg-type]

    class SelectPartnerForm(FormPage):
        model_config = ConfigDict(title="Choose a Partner")

        partners: partner_choice  # type: ignore[valid-type]
        partners: partner_choice()  # type: ignore[valid-type]

    initial_user_input = yield SelectPartnerForm

Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ from unittest.mock import patch
import pytest

from gso.cli.imports import (
    import_edge_port,
    import_iptrunks,
    import_office_routers,
    import_opengear,
@@ -13,6 +14,7 @@ from gso.cli.imports import (
    import_super_pop_switches,
)
from gso.products import Router, Site
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
@@ -195,6 +197,41 @@ def opengear_data(temp_file, faker, site_subscription_factory):
    return _opengear_data


@pytest.fixture()
def edge_port_data(temp_file, faker, nokia_router_subscription_factory, partner_factory):
    def _edge_port_data(**kwargs):
        edge_port_data = {
            "node": Router.from_subscription(nokia_router_subscription_factory()).router.router_fqdn,
            "service_type": EdgePortType.CUSTOMER,
            "speed": PhysicalPortCapacity.TEN_GIGABIT_PER_SECOND,
            "encapsulation": EncapsulationType.DOT1Q,
            "name": "lag34",
            "minimum_links": 2,
            "geant_ga_id": faker.geant_gid(),
            "mac_address": faker.mac_address(),
            "partner": partner_factory()["name"],
            "enable_lacp": True,
            "ignore_if_down": False,
            "ae_members": [
                {
                    "interface_name": faker.network_interface(),
                    "interface_description": faker.sentence(),
                },
                {
                    "interface_name": faker.network_interface(),
                    "interface_description": faker.sentence(),
                },
            ],
            "description": faker.sentence(),
        }
        edge_port_data.update(**kwargs)

        temp_file.write_text(json.dumps([edge_port_data]))
        return {"path": str(temp_file), "data": edge_port_data}

    return _edge_port_data


###########
#  TESTS  #
###########
@@ -377,3 +414,35 @@ def test_import_super_pop_switch_success(mock_start_process, mock_sleep, super_p
def test_import_opengear_success(mock_start_process, opengear_data):
    import_opengear(opengear_data()["path"])
    assert mock_start_process.call_count == 1


@patch("gso.cli.imports.time.sleep")
@patch("gso.cli.imports.start_process")
def test_import_edge_port_successful(mock_start_process, mock_sleep, edge_port_data):
    import_edge_port(edge_port_data()["path"])
    assert mock_start_process.call_count == 1


@patch("gso.cli.imports.time.sleep")
@patch("gso.cli.imports.start_process")
def test_import_edge_port_with_invalid_router(
    mock_start_process, mock_sleep, edge_port_data, capfd, nokia_router_subscription_factory
):
    p_router = nokia_router_subscription_factory(router_role=RouterRole.P)
    broken_data = edge_port_data(node=Router.from_subscription(p_router).router.router_fqdn)
    import_edge_port(broken_data["path"])

    captured_output, _ = capfd.readouterr()
    assert f"Router {p_router} not found" in captured_output
    assert mock_start_process.call_count == 0


@patch("gso.cli.imports.time.sleep")
@patch("gso.cli.imports.start_process")
def test_import_edge_port_with_invalid_partner(mock_start_process, mock_sleep, edge_port_data, capfd):
    broken_data = edge_port_data(partner="INVALID")
    import_edge_port(broken_data["path"])

    captured_output, _ = capfd.readouterr()
    assert "Partner INVALID not found" in captured_output
    assert mock_start_process.call_count == 0
+19 −6
Original line number Diff line number Diff line
@@ -36,15 +36,19 @@ from gso.main import init_gso_app
from gso.services.partners import PartnerSchema, create_partner
from gso.utils.types.interfaces import LAGMember, LAGMemberList
from test.fixtures import (  # noqa: F401
    bgp_session_subscription_factory,
    edge_port_subscription_factory,
    geant_ip_subscription_factory,
    iptrunk_side_subscription_factory,
    iptrunk_subscription_factory,
    juniper_router_subscription_factory,
    nokia_router_subscription_factory,
    nren_access_port_factory,
    office_router_subscription_factory,
    opengear_subscription_factory,
    service_binding_port_factory,
    site_subscription_factory,
    super_pop_switch_subscription_factory,
    test_workflow,
)

logging.getLogger("faker.factory").setLevel(logging.WARNING)
@@ -104,6 +108,12 @@ class FakerProvider(BaseProvider):

        return site_name

    def ipv4_netmask(self) -> int:
        return self.generator.random_int(min=1, max=32)

    def ipv6_netmask(self) -> int:
        return self.generator.random_int(min=1, max=128)

    def network_interface(self) -> str:
        return self.generator.numerify("ge-@#/@#/@#")

@@ -122,6 +132,9 @@ class FakerProvider(BaseProvider):
            for i in range(iface_amount)
        ]

    def vlan_id(self) -> int:
        return self.generator.random_int(min=1, max=4095)


@pytest.fixture(scope="session")
def faker() -> Faker:
@@ -272,15 +285,15 @@ def test_client(fastapi_app):


@pytest.fixture(scope="session")
def partner_factory():
def partner_factory(faker):
    def _create_partner(
        name: str,
        email: str,
        name: str | None = None,
        email: str | None = None,
    ) -> dict:
        return create_partner(
            PartnerSchema(
                name=name,
                email=email,
                name=name or faker.company(),
                email=email or faker.email(),
            )
        )

test/fixtures.py

deleted100644 → 0
+0 −638

File deleted.

Preview size limit exceeded, changes collapsed.

+29 −0
Original line number Diff line number Diff line
from test.fixtures.edge_port_fixtures import edge_port_subscription_factory
from test.fixtures.geant_ip_fixtures import (
    bgp_session_subscription_factory,
    geant_ip_subscription_factory,
    nren_access_port_factory,
    service_binding_port_factory,
)
from test.fixtures.iptrunk_fixtures import iptrunk_side_subscription_factory, iptrunk_subscription_factory
from test.fixtures.office_router_fixtures import office_router_subscription_factory
from test.fixtures.opengear_fixtures import opengear_subscription_factory
from test.fixtures.router_fixtures import juniper_router_subscription_factory, nokia_router_subscription_factory
from test.fixtures.site_fixtures import site_subscription_factory
from test.fixtures.super_pop_switch_fixtures import super_pop_switch_subscription_factory

__all__ = [
    "bgp_session_subscription_factory",
    "edge_port_subscription_factory",
    "geant_ip_subscription_factory",
    "iptrunk_side_subscription_factory",
    "iptrunk_subscription_factory",
    "juniper_router_subscription_factory",
    "nokia_router_subscription_factory",
    "nren_access_port_factory",
    "office_router_subscription_factory",
    "opengear_subscription_factory",
    "service_binding_port_factory",
    "site_subscription_factory",
    "super_pop_switch_subscription_factory",
]
+81 −0
Original line number Diff line number Diff line
from typing import Any

from orchestrator.db import (
    ProductTable,
    SubscriptionInstanceTable,
    SubscriptionInstanceValueTable,
    SubscriptionTable,
    db,
)
from orchestrator.utils.datetime import nowtz
from pydantic_forms.types import SubscriptionMapping


def create_subscription_for_mapping(
    product: ProductTable, mapping: SubscriptionMapping, values: dict[str, Any], **kwargs: Any
) -> SubscriptionTable:
    """Create a subscription in the test coredb for the given subscription_mapping and values.

    This function handles optional resource types starting with a ? in the mapping not supplied in the values array.

    Args:
        product: the ProductTable to create a sub for
        mapping: the subscription_mapping belonging to that product
        values: a dictionary of keys from the sub_map and their corresponding test values
        kwargs: The rest of the arguments

    Returns: The conforming subscription.
    """

    def build_instance(name, value_mapping):
        block = product.find_block_by_name(name)

        def build_value(rt, value):
            resource_type = block.find_resource_type_by_name(rt)
            return SubscriptionInstanceValueTable(resource_type_id=resource_type.resource_type_id, value=value)

        return SubscriptionInstanceTable(
            product_block_id=block.product_block_id,
            values=[
                build_value(resource_type, values[value_key]) for (resource_type, value_key) in value_mapping.items()
            ],
        )

    # recreate the mapping: leave out the ?keys if no value supplied for them
    mapping = {
        name: [
            {
                **{k: value_map[k] for k in value_map if not value_map[k].startswith("?")},
                **{
                    k: value_map[k][1:]
                    for k in value_map
                    if value_map[k].startswith("?") and value_map[k][1:] in values
                },
            }
            for value_map in mapping[name]
        ]
        for name in mapping
    }

    instances = [
        build_instance(name, value_mapping)
        for (name, value_mappings) in mapping.items()
        for value_mapping in value_mappings
    ]

    return create_subscription(instances=instances, product=product, **kwargs)


def create_subscription(**kwargs):
    attrs = {
        "description": "A subscription.",
        "customer_id": kwargs.get("customer_id", "85938c4c-0a11-e511-80d0-005056956c1a"),
        "start_date": nowtz(),
        "status": "active",
        "insync": True,
        **kwargs,
    }
    o = SubscriptionTable(**attrs)
    db.session.add(o)
    db.session.commit()
    return o
Original line number Diff line number Diff line
@@ -12,10 +12,22 @@ class MockedNetboxClient:
    def get_device_by_name(self):
        return self.BaseMockObject(id=1, name="test")

    @staticmethod
    def get_interface_by_name_and_device(interface_name: str, device_name: str):
        return {
            "name": f"{interface_name}",
            "module": {"display": f"Module{interface_name}"},
            "description": f"Description{interface_name}-{device_name}",
        }

    @staticmethod
    def get_available_lags() -> list[str]:
        return [f"lag-{lag}" for lag in range(1, 5)]

    @staticmethod
    def get_available_services_lags() -> list[str]:
        return [f"lag-{lag}" for lag in range(21, 50)]

    @staticmethod
    def get_available_interfaces():
        interfaces = []
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ import pytest
from requests import HTTPError

from gso.services.librenms_client import LibreNMSClient
from gso.utils.types.snmp import SNMPVersion
from gso.utils.shared_enums import SNMPVersion


@pytest.fixture()