Newer
Older
import ipaddress
import pytest
from orchestrator.db import db
from orchestrator.domain import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle, UUIDstr
Karel van Klink
committed
from gso.products.product_blocks.iptrunk import (
IptrunkInterfaceBlock,
IptrunkSideBlock,
IptrunkType,
PhyPortCapacity,
)
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.products.product_types.iptrunk import IptrunkInactive
from gso.products.product_types.router import Router, RouterInactive
from gso.products.product_types.site import Site, SiteInactive
from gso.services import subscriptions
from gso.utils.shared_enums import Vendor
CUSTOMER_ID: UUIDstr = "2f47f65a-0911-e511-80d0-005056956c1a"
Karel van Klink
committed
@pytest.fixture()
def site_subscription_factory(faker):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
site_name=None,
site_city=None,
site_country=None,
site_country_code=None,
site_latitude=None,
site_longitude=None,
site_bgp_community_id=None,
site_internal_id=None,
site_tier=SiteTier.TIER1,
site_ts_address=None,
) -> UUIDstr:
description = description or "Site Subscription"
site_name = site_name or faker.domain_word()
site_city = site_city or faker.city()
site_country = site_country or faker.country()
site_country_code = site_country_code or faker.country_code()
site_latitude = site_latitude or float(faker.latitude())
site_longitude = site_longitude or float(faker.longitude())
site_bgp_community_id = site_bgp_community_id or faker.pyint()
site_internal_id = site_internal_id or faker.pyint()
site_ts_address = site_ts_address or faker.ipv4()
product_id = subscriptions.get_product_id_by_name(ProductType.SITE)
site_subscription = SiteInactive.from_product_id(product_id, customer_id=CUSTOMER_ID, insync=True)
site_subscription.site.site_city = site_city
site_subscription.site.site_name = site_name
site_subscription.site.site_country = site_country
site_subscription.site.site_country_code = site_country_code
site_subscription.site.site_latitude = site_latitude
site_subscription.site.site_longitude = site_longitude
site_subscription.site.site_bgp_community_id = site_bgp_community_id
site_subscription.site.site_internal_id = site_internal_id
site_subscription.site.site_tier = site_tier
site_subscription.site.site_ts_address = site_ts_address
site_subscription = SubscriptionModel.from_other_lifecycle(site_subscription, SubscriptionLifecycle.ACTIVE)
site_subscription.description = description
site_subscription.start_date = start_date
site_subscription.save()
db.session.commit()
return str(site_subscription.subscription_id)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def nokia_router_subscription_factory(site_subscription_factory, faker):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
router_fqdn=None,
router_ts_port=None,
router_access_via_ts=None,
router_lo_ipv4_address=None,
router_lo_ipv6_address=None,
router_lo_iso_address=None,
router_role=RouterRole.PE,
router_site=None,
status: SubscriptionLifecycle | None = None,
) -> UUIDstr:
description = description or faker.text(max_nb_chars=30)
router_fqdn = router_fqdn or faker.domain_name(levels=4)
router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
router_access_via_ts = router_access_via_ts or faker.boolean()
router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
router_lo_iso_address = router_lo_iso_address or faker.word()
router_site = router_site or site_subscription_factory()
product_id = subscriptions.get_product_id_by_name(ProductType.ROUTER)
router_subscription = RouterInactive.from_product_id(product_id, customer_id=CUSTOMER_ID, insync=True)
router_subscription.router.router_fqdn = router_fqdn
router_subscription.router.router_ts_port = router_ts_port
router_subscription.router.router_access_via_ts = router_access_via_ts
router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
router_subscription.router.router_lo_iso_address = router_lo_iso_address
router_subscription.router.router_role = router_role
router_subscription.router.router_site = Site.from_subscription(router_site).site
router_subscription.router.vendor = Vendor.NOKIA
router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
router_subscription.description = description
router_subscription.start_date = start_date
if status:
router_subscription.status = status
router_subscription.save()
db.session.commit()
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
return str(router_subscription.subscription_id)
return subscription_create
@pytest.fixture()
def juniper_router_subscription_factory(site_subscription_factory, faker):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
router_fqdn=None,
router_ts_port=None,
router_access_via_ts=None,
router_lo_ipv4_address=None,
router_lo_ipv6_address=None,
router_lo_iso_address=None,
router_role=RouterRole.PE,
router_site=None,
status: SubscriptionLifecycle | None = None,
) -> UUIDstr:
description = description or faker.text(max_nb_chars=30)
router_fqdn = router_fqdn or faker.domain_name(levels=4)
router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
router_access_via_ts = router_access_via_ts or faker.boolean()
router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
router_lo_iso_address = router_lo_iso_address or faker.word()
router_site = router_site or site_subscription_factory()
product_id = subscriptions.get_product_id_by_name(ProductType.ROUTER)
router_subscription = RouterInactive.from_product_id(product_id, customer_id=CUSTOMER_ID, insync=True)
router_subscription.router.router_fqdn = router_fqdn
router_subscription.router.router_ts_port = router_ts_port
router_subscription.router.router_access_via_ts = router_access_via_ts
router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
router_subscription.router.router_lo_iso_address = router_lo_iso_address
router_subscription.router.router_role = router_role
router_subscription.router.router_site = Site.from_subscription(router_site).site
router_subscription.router.vendor = Vendor.JUNIPER
router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
router_subscription.description = description
router_subscription.start_date = start_date
if status:
router_subscription.status = status
router_subscription.save()
db.session.commit()
return str(router_subscription.subscription_id)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker):
def subscription_create(
iptrunk_side_node=None,
iptrunk_side_ae_iface=None,
iptrunk_side_ae_geant_a_sid=None,
iptrunk_side_ae_members=None,
iptrunk_side_ae_members_description=None,
) -> IptrunkSideBlock:
iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory()
iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router
iptrunk_side_ae_iface = iptrunk_side_ae_iface or faker.pystr()
iptrunk_side_ae_geant_a_sid = iptrunk_side_ae_geant_a_sid or faker.geant_sid()
Karel van Klink
committed
iptrunk_side_ae_members = iptrunk_side_ae_members or [
IptrunkInterfaceBlock.new(
Karel van Klink
committed
faker.uuid4(),
interface_name=faker.network_interface(),
interface_description=faker.sentence(),
Karel van Klink
committed
),
IptrunkInterfaceBlock.new(
Karel van Klink
committed
faker.uuid4(),
interface_name=faker.network_interface(),
interface_description=faker.sentence(),
Karel van Klink
committed
),
]
return IptrunkSideBlock.new(
faker.uuid4(),
iptrunk_side_node=iptrunk_side_node,
iptrunk_side_ae_iface=iptrunk_side_ae_iface,
iptrunk_side_ae_geant_a_sid=iptrunk_side_ae_geant_a_sid,
iptrunk_side_ae_members=iptrunk_side_ae_members,
iptrunk_side_ae_members_description=iptrunk_side_ae_members_description,
)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
geant_s_sid=None,
iptrunk_description=None,
iptrunk_type=IptrunkType.DARK_FIBER,
iptrunk_speed=PhyPortCapacity.ONE_GIGABIT_PER_SECOND,
iptrunk_isis_metric=None,
iptrunk_ipv4_network=None,
iptrunk_ipv6_network=None,
iptrunk_sides=None,
Karel van Klink
committed
status: SubscriptionLifecycle | None = None,
) -> UUIDstr:
product_id = subscriptions.get_product_id_by_name(ProductType.IP_TRUNK)
description = description or faker.sentence()
geant_s_sid = geant_s_sid or faker.geant_sid()
iptrunk_description = iptrunk_description or faker.sentence()
iptrunk_isis_metric = iptrunk_isis_metric or faker.pyint()
iptrunk_ipv4_network = iptrunk_ipv4_network or faker.ipv4(network=True)
iptrunk_ipv6_network = iptrunk_ipv6_network or faker.ipv6(network=True)
iptrunk_minimum_links = 1
iptrunk_side_a = iptrunk_side_subscription_factory()
iptrunk_side_b = iptrunk_side_subscription_factory()
iptrunk_sides = iptrunk_sides or [iptrunk_side_a, iptrunk_side_b]
iptrunk_subscription = IptrunkInactive.from_product_id(product_id, customer_id=CUSTOMER_ID, insync=True)
iptrunk_subscription.iptrunk.geant_s_sid = geant_s_sid
iptrunk_subscription.iptrunk.iptrunk_description = iptrunk_description
iptrunk_subscription.iptrunk.iptrunk_type = iptrunk_type
iptrunk_subscription.iptrunk.iptrunk_speed = iptrunk_speed
iptrunk_subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
iptrunk_subscription.iptrunk.iptrunk_isis_metric = iptrunk_isis_metric
iptrunk_subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
iptrunk_subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
iptrunk_subscription.iptrunk.iptrunk_sides = iptrunk_sides
iptrunk_subscription = SubscriptionModel.from_other_lifecycle(
Karel van Klink
committed
iptrunk_subscription,
SubscriptionLifecycle.ACTIVE,
Karel van Klink
committed
if status:
iptrunk_subscription.status = status
iptrunk_subscription.description = description
iptrunk_subscription.start_date = start_date
iptrunk_subscription.save()
db.session.commit()
return str(iptrunk_subscription.subscription_id)
return subscription_create