Newer
Older
import ipaddress
import pytest
from orchestrator.db import db
from orchestrator.domain import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle, UUIDstr
Karel van Klink
committed
from gso.products.product_blocks.iptrunk import (
IptrunkInterfaceBlock,
IptrunkSideBlock,
IptrunkType,
PhyPortCapacity,
)
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.products.product_types.iptrunk import IptrunkInactive
from gso.products.product_types.router import Router, RouterInactive
from gso.products.product_types.site import Site, SiteInactive
from gso.services import subscriptions
from gso.utils.shared_enums import Vendor
Karel van Klink
committed
@pytest.fixture()
def site_subscription_factory(faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
site_name=None,
site_city=None,
site_country=None,
site_country_code=None,
site_latitude=None,
site_longitude=None,
site_bgp_community_id=None,
site_internal_id=None,
site_tier=SiteTier.TIER1,
site_ts_address=None,
if partner is None:
partner = geant_partner
description = description or "Site Subscription"
site_name = site_name or faker.domain_word()
site_city = site_city or faker.city()
site_country = site_country or faker.country()
site_country_code = site_country_code or faker.country_code()
site_latitude = site_latitude or float(faker.latitude())
site_longitude = site_longitude or float(faker.longitude())
site_bgp_community_id = site_bgp_community_id or faker.pyint()
site_internal_id = site_internal_id or faker.pyint()
site_ts_address = site_ts_address or faker.ipv4()
product_id = subscriptions.get_product_id_by_name(ProductType.SITE)
site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True)
site_subscription.site.site_city = site_city
site_subscription.site.site_name = site_name
site_subscription.site.site_country = site_country
site_subscription.site.site_country_code = site_country_code
site_subscription.site.site_latitude = site_latitude
site_subscription.site.site_longitude = site_longitude
site_subscription.site.site_bgp_community_id = site_bgp_community_id
site_subscription.site.site_internal_id = site_internal_id
site_subscription.site.site_tier = site_tier
site_subscription.site.site_ts_address = site_ts_address
site_subscription = SubscriptionModel.from_other_lifecycle(site_subscription, SubscriptionLifecycle.ACTIVE)
site_subscription.description = description
site_subscription.start_date = start_date
site_subscription.save()
db.session.commit()
return str(site_subscription.subscription_id)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def nokia_router_subscription_factory(site_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
router_fqdn=None,
router_ts_port=None,
router_access_via_ts=None,
router_lo_ipv4_address=None,
router_lo_ipv6_address=None,
router_lo_iso_address=None,
router_role=RouterRole.PE,
router_site=None,
status: SubscriptionLifecycle | None = None,
if partner is None:
partner = geant_partner
description = description or faker.text(max_nb_chars=30)
router_fqdn = router_fqdn or faker.domain_name(levels=4)
router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
router_access_via_ts = router_access_via_ts or faker.boolean()
router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
router_lo_iso_address = router_lo_iso_address or faker.word()
router_site = router_site or site_subscription_factory()
product_id = subscriptions.get_product_id_by_name(ProductType.ROUTER)
router_subscription = RouterInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True)
router_subscription.router.router_fqdn = router_fqdn
router_subscription.router.router_ts_port = router_ts_port
router_subscription.router.router_access_via_ts = router_access_via_ts
router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
router_subscription.router.router_lo_iso_address = router_lo_iso_address
router_subscription.router.router_role = router_role
router_subscription.router.router_site = Site.from_subscription(router_site).site
router_subscription.router.vendor = Vendor.NOKIA
router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
router_subscription.description = description
router_subscription.start_date = start_date
if status:
router_subscription.status = status
router_subscription.save()
db.session.commit()
return str(router_subscription.subscription_id)
return subscription_create
@pytest.fixture()
def juniper_router_subscription_factory(site_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
router_fqdn=None,
router_ts_port=None,
router_access_via_ts=None,
router_lo_ipv4_address=None,
router_lo_ipv6_address=None,
router_lo_iso_address=None,
router_role=RouterRole.PE,
router_site=None,
status: SubscriptionLifecycle | None = None,
if partner is None:
partner = geant_partner
description = description or faker.text(max_nb_chars=30)
router_fqdn = router_fqdn or faker.domain_name(levels=4)
router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
router_access_via_ts = router_access_via_ts or faker.boolean()
router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
router_lo_iso_address = router_lo_iso_address or faker.word()
router_site = router_site or site_subscription_factory()
product_id = subscriptions.get_product_id_by_name(ProductType.ROUTER)
router_subscription = RouterInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True)
router_subscription.router.router_fqdn = router_fqdn
router_subscription.router.router_ts_port = router_ts_port
router_subscription.router.router_access_via_ts = router_access_via_ts
router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
router_subscription.router.router_lo_iso_address = router_lo_iso_address
router_subscription.router.router_role = router_role
router_subscription.router.router_site = Site.from_subscription(router_site).site
router_subscription.router.vendor = Vendor.JUNIPER
router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
router_subscription.description = description
router_subscription.start_date = start_date
if status:
router_subscription.status = status
router_subscription.save()
db.session.commit()
return str(router_subscription.subscription_id)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker):
def subscription_create(
iptrunk_side_node=None,
iptrunk_side_ae_iface=None,
iptrunk_side_ae_geant_a_sid=None,
iptrunk_side_ae_members=None,
iptrunk_side_ae_members_description=None,
) -> IptrunkSideBlock:
iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory()
iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router
iptrunk_side_ae_iface = iptrunk_side_ae_iface or faker.pystr()
iptrunk_side_ae_geant_a_sid = iptrunk_side_ae_geant_a_sid or faker.geant_sid()
Karel van Klink
committed
iptrunk_side_ae_members = iptrunk_side_ae_members or [
IptrunkInterfaceBlock.new(
Karel van Klink
committed
faker.uuid4(),
interface_name=faker.network_interface(),
interface_description=faker.sentence(),
Karel van Klink
committed
),
IptrunkInterfaceBlock.new(
Karel van Klink
committed
faker.uuid4(),
interface_name=faker.network_interface(),
interface_description=faker.sentence(),
Karel van Klink
committed
),
]
return IptrunkSideBlock.new(
faker.uuid4(),
iptrunk_side_node=iptrunk_side_node,
iptrunk_side_ae_iface=iptrunk_side_ae_iface,
iptrunk_side_ae_geant_a_sid=iptrunk_side_ae_geant_a_sid,
iptrunk_side_ae_members=iptrunk_side_ae_members,
iptrunk_side_ae_members_description=iptrunk_side_ae_members_description,
)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
geant_s_sid=None,
iptrunk_description=None,
iptrunk_type=IptrunkType.DARK_FIBER,
iptrunk_speed=PhyPortCapacity.ONE_GIGABIT_PER_SECOND,
iptrunk_isis_metric=None,
iptrunk_ipv4_network=None,
iptrunk_ipv6_network=None,
iptrunk_sides=None,
Karel van Klink
committed
status: SubscriptionLifecycle | None = None,
if partner is None:
partner = geant_partner
product_id = subscriptions.get_product_id_by_name(ProductType.IP_TRUNK)
description = description or faker.sentence()
geant_s_sid = geant_s_sid or faker.geant_sid()
iptrunk_description = iptrunk_description or faker.sentence()
iptrunk_isis_metric = iptrunk_isis_metric or faker.pyint()
iptrunk_ipv4_network = iptrunk_ipv4_network or faker.ipv4(network=True)
iptrunk_ipv6_network = iptrunk_ipv6_network or faker.ipv6(network=True)
iptrunk_minimum_links = 1
iptrunk_side_a = iptrunk_side_subscription_factory()
iptrunk_side_b = iptrunk_side_subscription_factory()
iptrunk_sides = iptrunk_sides or [iptrunk_side_a, iptrunk_side_b]
iptrunk_subscription = IptrunkInactive.from_product_id(
product_id, customer_id=partner["partner_id"], insync=True
)
iptrunk_subscription.iptrunk.geant_s_sid = geant_s_sid
iptrunk_subscription.iptrunk.iptrunk_description = iptrunk_description
iptrunk_subscription.iptrunk.iptrunk_type = iptrunk_type
iptrunk_subscription.iptrunk.iptrunk_speed = iptrunk_speed
iptrunk_subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
iptrunk_subscription.iptrunk.iptrunk_isis_metric = iptrunk_isis_metric
iptrunk_subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
iptrunk_subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
iptrunk_subscription.iptrunk.iptrunk_sides = iptrunk_sides
iptrunk_subscription = SubscriptionModel.from_other_lifecycle(
Karel van Klink
committed
iptrunk_subscription,
SubscriptionLifecycle.ACTIVE,
Karel van Klink
committed
if status:
iptrunk_subscription.status = status
iptrunk_subscription.description = description
iptrunk_subscription.start_date = start_date
iptrunk_subscription.save()
db.session.commit()
return str(iptrunk_subscription.subscription_id)
return subscription_create