Skip to content
Snippets Groups Projects
Verified Commit 9addd138 authored by Neda Moeini's avatar Neda Moeini Committed by Karel van Klink
Browse files

Geant IP fixtures.

parent c3e80d38
Branches
Tags
1 merge request!286Add Edge Port, GÉANT IP and IAS products
...@@ -36,13 +36,17 @@ from gso.main import init_gso_app ...@@ -36,13 +36,17 @@ from gso.main import init_gso_app
from gso.services.partners import PartnerSchema, create_partner from gso.services.partners import PartnerSchema, create_partner
from gso.utils.types.interfaces import LAGMember, LAGMemberList from gso.utils.types.interfaces import LAGMember, LAGMemberList
from test.fixtures import ( # noqa: F401 from test.fixtures import ( # noqa: F401
bgp_session_subscription_factory,
edge_port_subscription_factory, edge_port_subscription_factory,
geant_ip_subscription_factory,
iptrunk_side_subscription_factory, iptrunk_side_subscription_factory,
iptrunk_subscription_factory, iptrunk_subscription_factory,
nren_access_port_factory,
office_router_subscription_factory, office_router_subscription_factory,
opengear_subscription_factory, opengear_subscription_factory,
router_subscription_factory, router_subscription_factory,
site_subscription_factory, site_subscription_factory,
service_binding_port_factory,
super_pop_switch_subscription_factory, super_pop_switch_subscription_factory,
) )
......
from test.fixtures.edge_port_fixtures import edge_port_subscription_factory from test.fixtures.edge_port_fixtures import edge_port_subscription_factory
from test.fixtures.geant_ip_fixtures import (
bgp_session_subscription_factory,
geant_ip_subscription_factory,
nren_access_port_factory,
service_binding_port_factory,
)
from test.fixtures.iptrunk_fixtures import iptrunk_side_subscription_factory, iptrunk_subscription_factory from test.fixtures.iptrunk_fixtures import iptrunk_side_subscription_factory, iptrunk_subscription_factory
from test.fixtures.office_router_fixtures import office_router_subscription_factory from test.fixtures.office_router_fixtures import office_router_subscription_factory
from test.fixtures.opengear_fixtures import opengear_subscription_factory from test.fixtures.opengear_fixtures import opengear_subscription_factory
...@@ -16,4 +22,8 @@ __all__ = [ ...@@ -16,4 +22,8 @@ __all__ = [
"opengear_subscription_factory", "opengear_subscription_factory",
"site_subscription_factory", "site_subscription_factory",
"super_pop_switch_subscription_factory", "super_pop_switch_subscription_factory",
"geant_ip_subscription_factory",
"bgp_session_subscription_factory",
"service_binding_port_factory",
"nren_access_port_factory",
] ]
import random
from uuid import uuid4
import pytest
from orchestrator.db import db
from orchestrator.domain import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle, UUIDstr
from gso.products import EdgePort, ProductName
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.geant_ip import NRENAccessPort
from gso.products.product_blocks.service_binding_port import ServiceBindingPort
from gso.products.product_types.geant_ip import GeantIPInactive
from gso.services import subscriptions
from gso.utils.shared_enums import APType, SBPType
from gso.utils.types.ip_address import IPAddress
@pytest.fixture()
def bgp_session_subscription_factory(faker):
def create_bgp_session(
peer_address: IPAddress | None = None,
bfd_interval: int = 2,
bfd_multiplier: int = 2,
families: list[IPFamily] | None = None,
authentication_key: str | None = None,
*,
is_multi_hop: bool = False,
has_custom_policies: bool = False,
bfd_enabled: bool = True,
multipath_enabled: bool | None = True,
send_default_route: bool | None = True,
):
return BGPSession.new(
subscription_id=uuid4(),
peer_address=peer_address or faker.ipv4(),
bfd_enabled=bfd_enabled,
families=families or [IPFamily.V4UNICAST],
has_custom_policies=has_custom_policies,
authentication_key=authentication_key or faker.password(),
multipath_enabled=multipath_enabled,
send_default_route=send_default_route,
is_multi_hop=is_multi_hop,
bfd_interval=bfd_interval,
bfd_multiplier=bfd_multiplier,
)
return create_bgp_session
@pytest.fixture()
def service_binding_port_factory(faker, bgp_session_subscription_factory):
def create_service_binding_port(
sbp_bgp_session_list: list | None = None,
geant_sid: str | None = None,
sbp_type: SBPType = SBPType.L3,
ipv4_address: str | None = None,
ipv6_address: str | None = None,
vlan_id: int | None = None,
*,
custom_firewall_filters: bool = False,
is_tagged: bool = False,
):
return ServiceBindingPort.new(
subscription_id=uuid4(),
is_tagged=is_tagged,
vlan_id=vlan_id or faker.pyint(min_value=1, max_value=4096),
sbp_type=sbp_type,
ipv4_address=ipv4_address or faker.ipv4(),
ipv6_address=ipv6_address or faker.ipv6(),
custom_firewall_filters=custom_firewall_filters,
geant_sid=geant_sid or faker.geant_sid(),
sbp_bgp_session_list=sbp_bgp_session_list or [bgp_session_subscription_factory() for _ in range(2)],
)
return create_service_binding_port
@pytest.fixture()
def nren_access_port_factory(faker, edge_port_subscription_factory):
def create_nren_access_port(
nren_ap_type: APType | None = None,
edge_port: UUIDstr | None = None,
):
edge_port = edge_port or edge_port_subscription_factory()
geant_ip_ep = EdgePort.from_subscription(edge_port).edge_port
return NRENAccessPort.new(
subscription_id=uuid4(),
nren_ap_type=nren_ap_type or random.choice(list(APType)), # noqa: S311
geant_ip_ep=geant_ip_ep,
)
return create_nren_access_port
@pytest.fixture()
def geant_ip_subscription_factory(
faker,
partner_factory,
edge_port_subscription_factory,
bgp_session_subscription_factory,
service_binding_port_factory,
nren_access_port_factory,
):
def create_geant_ip_subscription(
description=None,
partner: dict | None = None,
nren_ap_list: list[NRENAccessPort] | None = None,
start_date="2023-05-24T00:00:00+00:00",
status: SubscriptionLifecycle | None = None,
) -> UUIDstr:
product_id = subscriptions.get_product_id_by_name(ProductName.GEANT_IP)
partner = partner or partner_factory(name=faker.company(), email=faker.email())
# Create GEANT IP subscription with product_id and partner details
geant_ip_subscription = GeantIPInactive.from_product_id(
product_id, customer_id=partner["partner_id"], insync=True
)
# Default nren_ap_list creation with primary and backup access ports
nren_ap_list = nren_ap_list or [
nren_access_port_factory(nren_ap_type=APType.PRIMARY,
edge_port=edge_port_subscription_factory(partner=partner)),
nren_access_port_factory(nren_ap_type=APType.BACKUP,
edge_port=edge_port_subscription_factory(partner=partner)),
]
# Assign and save edge port and service binding ports
for nren_ap in nren_ap_list:
edge_port = nren_ap.geant_ip_ep
edge_port.edge_port_sbp_list = [
service_binding_port_factory(),
service_binding_port_factory(),
]
# Update subscription with description, start date, and status
geant_ip_subscription = SubscriptionModel.from_other_lifecycle(
geant_ip_subscription,
SubscriptionLifecycle.ACTIVE,
)
if status:
geant_ip_subscription.status = status
geant_ip_subscription.description = description or faker.text(max_nb_chars=30)
geant_ip_subscription.start_date = start_date
geant_ip_subscription.status = status or SubscriptionLifecycle.ACTIVE
geant_ip_subscription.save()
db.session.commit()
return str(geant_ip_subscription.subscription_id)
return create_geant_ip_subscription
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment