Newer
Older
import ipaddress
import pytest
from orchestrator.db import db
from orchestrator.domain import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle, UUIDstr

Mohammad Torkashvand
committed
from gso.products import ProductName
Karel van Klink
committed
from gso.products.product_blocks.iptrunk import (
IptrunkInterfaceBlock,
IptrunkSideBlock,
IptrunkType,
Karel van Klink
committed
)
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.products.product_types.iptrunk import IptrunkInactive

Mohammad Torkashvand
committed
from gso.products.product_types.office_router import OfficeRouterInactive
from gso.products.product_types.router import Router, RouterInactive
from gso.products.product_types.site import Site, SiteInactive

Mohammad Torkashvand
committed
from gso.products.product_types.super_pop_switch import SuperPopSwitchInactive
from gso.services import subscriptions
from gso.utils.shared_enums import Vendor
Karel van Klink
committed
@pytest.fixture()
def site_subscription_factory(faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
site_name=None,
site_city=None,
site_country=None,
site_country_code=None,
site_latitude=None,
site_longitude=None,
site_bgp_community_id=None,
site_internal_id=None,
site_tier=SiteTier.TIER1,
site_ts_address=None,
status: SubscriptionLifecycle | None = None,
if partner is None:
partner = geant_partner
description = description or "Site Subscription"
site_name = site_name or faker.domain_word()
site_city = site_city or faker.city()
site_country = site_country or faker.country()
site_country_code = site_country_code or faker.country_code()
site_latitude = site_latitude or float(faker.latitude())
site_longitude = site_longitude or float(faker.longitude())
site_bgp_community_id = site_bgp_community_id or faker.pyint()
site_internal_id = site_internal_id or faker.pyint()
site_ts_address = site_ts_address or faker.ipv4()

Mohammad Torkashvand
committed
product_id = subscriptions.get_product_id_by_name(ProductName.SITE)
site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True)
site_subscription.site.site_city = site_city
site_subscription.site.site_name = site_name
site_subscription.site.site_country = site_country
site_subscription.site.site_country_code = site_country_code
site_subscription.site.site_latitude = site_latitude
site_subscription.site.site_longitude = site_longitude
site_subscription.site.site_bgp_community_id = site_bgp_community_id
site_subscription.site.site_internal_id = site_internal_id
site_subscription.site.site_tier = site_tier
site_subscription.site.site_ts_address = site_ts_address
site_subscription = SubscriptionModel.from_other_lifecycle(site_subscription, SubscriptionLifecycle.ACTIVE)
site_subscription.description = description
site_subscription.start_date = start_date
if status:
site_subscription.status = status
site_subscription.save()
db.session.commit()
return str(site_subscription.subscription_id)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def nokia_router_subscription_factory(site_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
router_fqdn=None,
router_ts_port=None,
router_access_via_ts=None,
router_lo_ipv4_address=None,
router_lo_ipv6_address=None,
router_lo_iso_address=None,
router_role=RouterRole.PE,
router_site=None,
status: SubscriptionLifecycle | None = None,
if partner is None:
partner = geant_partner
description = description or faker.text(max_nb_chars=30)
router_fqdn = router_fqdn or faker.domain_name(levels=4)
router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
router_access_via_ts = router_access_via_ts or faker.boolean()
router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
router_lo_iso_address = router_lo_iso_address or faker.word()
router_site = router_site or site_subscription_factory()

Mohammad Torkashvand
committed
product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER)
router_subscription = RouterInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True)
router_subscription.router.router_fqdn = router_fqdn
router_subscription.router.router_ts_port = router_ts_port
router_subscription.router.router_access_via_ts = router_access_via_ts
router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
router_subscription.router.router_lo_iso_address = router_lo_iso_address
router_subscription.router.router_role = router_role
router_subscription.router.router_site = Site.from_subscription(router_site).site
router_subscription.router.vendor = Vendor.NOKIA
router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
router_subscription.description = description
router_subscription.start_date = start_date
if status:
router_subscription.status = status
router_subscription.save()
db.session.commit()
return str(router_subscription.subscription_id)
return subscription_create
@pytest.fixture()
def juniper_router_subscription_factory(site_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
router_fqdn=None,
router_ts_port=None,
router_access_via_ts=None,
router_lo_ipv4_address=None,
router_lo_ipv6_address=None,
router_lo_iso_address=None,
router_role=RouterRole.PE,
router_site=None,
status: SubscriptionLifecycle | None = None,
if partner is None:
partner = geant_partner
description = description or faker.text(max_nb_chars=30)
router_fqdn = router_fqdn or faker.domain_name(levels=4)
router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
router_access_via_ts = router_access_via_ts or faker.boolean()
router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
router_lo_iso_address = router_lo_iso_address or faker.word()
router_site = router_site or site_subscription_factory()

Mohammad Torkashvand
committed
product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER)
router_subscription = RouterInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True)
router_subscription.router.router_fqdn = router_fqdn
router_subscription.router.router_ts_port = router_ts_port
router_subscription.router.router_access_via_ts = router_access_via_ts
router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
router_subscription.router.router_lo_iso_address = router_lo_iso_address
router_subscription.router.router_role = router_role
router_subscription.router.router_site = Site.from_subscription(router_site).site
router_subscription.router.vendor = Vendor.JUNIPER
router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
router_subscription.description = description
router_subscription.start_date = start_date
if status:
router_subscription.status = status
router_subscription.save()
db.session.commit()
return str(router_subscription.subscription_id)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker):
def subscription_create(
iptrunk_side_node=None,
iptrunk_side_ae_iface=None,
iptrunk_side_ae_geant_a_sid=None,
iptrunk_side_ae_members=None,
iptrunk_side_ae_members_description=None,
) -> IptrunkSideBlock:
iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory()
iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router
iptrunk_side_ae_iface = iptrunk_side_ae_iface or faker.pystr()
iptrunk_side_ae_geant_a_sid = iptrunk_side_ae_geant_a_sid or faker.geant_sid()
Karel van Klink
committed
iptrunk_side_ae_members = iptrunk_side_ae_members or [
IptrunkInterfaceBlock.new(
Karel van Klink
committed
faker.uuid4(),
interface_name=faker.network_interface(),
interface_description=faker.sentence(),
Karel van Klink
committed
),
IptrunkInterfaceBlock.new(
Karel van Klink
committed
faker.uuid4(),
interface_name=faker.network_interface(),
interface_description=faker.sentence(),
Karel van Klink
committed
),
]
return IptrunkSideBlock.new(
faker.uuid4(),
iptrunk_side_node=iptrunk_side_node,
iptrunk_side_ae_iface=iptrunk_side_ae_iface,
iptrunk_side_ae_geant_a_sid=iptrunk_side_ae_geant_a_sid,
iptrunk_side_ae_members=iptrunk_side_ae_members,
iptrunk_side_ae_members_description=iptrunk_side_ae_members_description,
)
return subscription_create
Karel van Klink
committed
@pytest.fixture()
def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
geant_s_sid=None,
iptrunk_description=None,
iptrunk_type=IptrunkType.DARK_FIBER,
iptrunk_speed=PhysicalPortCapacity.ONE_GIGABIT_PER_SECOND,
iptrunk_isis_metric=None,
iptrunk_ipv4_network=None,
iptrunk_ipv6_network=None,
iptrunk_sides=None,
Karel van Klink
committed
status: SubscriptionLifecycle | None = None,
if partner is None:
partner = geant_partner

Mohammad Torkashvand
committed
product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK)
description = description or faker.sentence()
geant_s_sid = geant_s_sid or faker.geant_sid()
iptrunk_description = iptrunk_description or faker.sentence()
iptrunk_isis_metric = iptrunk_isis_metric or faker.pyint()
iptrunk_ipv4_network = iptrunk_ipv4_network or faker.ipv4_network(max_subnet=31)
iptrunk_ipv6_network = iptrunk_ipv6_network or faker.ipv6_network(max_subnet=126)
iptrunk_minimum_links = 1
iptrunk_side_a = iptrunk_side_subscription_factory()
iptrunk_side_b = iptrunk_side_subscription_factory()
iptrunk_sides = iptrunk_sides or [iptrunk_side_a, iptrunk_side_b]
iptrunk_subscription = IptrunkInactive.from_product_id(
product_id, customer_id=partner["partner_id"], insync=True
)
iptrunk_subscription.iptrunk.geant_s_sid = geant_s_sid
iptrunk_subscription.iptrunk.iptrunk_description = iptrunk_description
iptrunk_subscription.iptrunk.iptrunk_type = iptrunk_type
iptrunk_subscription.iptrunk.iptrunk_speed = iptrunk_speed
iptrunk_subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
iptrunk_subscription.iptrunk.iptrunk_isis_metric = iptrunk_isis_metric
iptrunk_subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
iptrunk_subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
iptrunk_subscription.iptrunk.iptrunk_sides = iptrunk_sides
iptrunk_subscription = SubscriptionModel.from_other_lifecycle(
Karel van Klink
committed
iptrunk_subscription,
SubscriptionLifecycle.ACTIVE,
Karel van Klink
committed
if status:
iptrunk_subscription.status = status
iptrunk_subscription.description = description
iptrunk_subscription.start_date = start_date
iptrunk_subscription.save()
db.session.commit()
return str(iptrunk_subscription.subscription_id)
return subscription_create

Mohammad Torkashvand
committed
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
@pytest.fixture()
def office_router_subscription_factory(site_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
office_router_fqdn=None,
office_router_ts_port=None,
office_router_lo_ipv4_address=None,
office_router_lo_ipv6_address=None,
office_router_site=None,
status: SubscriptionLifecycle | None = None,
partner: dict | None = None,
) -> UUIDstr:
if partner is None:
partner = geant_partner
description = description or faker.text(max_nb_chars=30)
office_router_fqdn = office_router_fqdn or faker.domain_name(levels=4)
office_router_ts_port = office_router_ts_port or faker.random_int(min=1, max=49151)
office_router_lo_ipv4_address = office_router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
office_router_lo_ipv6_address = office_router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
office_router_site = office_router_site or site_subscription_factory()
product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER)
office_router_subscription = OfficeRouterInactive.from_product_id(
product_id, customer_id=partner["partner_id"], insync=True
)
office_router_subscription.office_router.office_router_fqdn = office_router_fqdn
office_router_subscription.office_router.office_router_ts_port = office_router_ts_port
office_router_subscription.office_router.office_router_lo_ipv4_address = office_router_lo_ipv4_address
office_router_subscription.office_router.office_router_lo_ipv6_address = office_router_lo_ipv6_address
office_router_subscription.office_router.office_router_site = Site.from_subscription(office_router_site).site
office_router_subscription.office_router.vendor = Vendor.NOKIA
office_router_subscription = SubscriptionModel.from_other_lifecycle(
office_router_subscription, SubscriptionLifecycle.ACTIVE
)
office_router_subscription.description = description
office_router_subscription.start_date = start_date
if status:
office_router_subscription.status = status
office_router_subscription.save()
db.session.commit()
return str(office_router_subscription.subscription_id)
return subscription_create
@pytest.fixture()
def super_pop_switch_subscription_factory(site_subscription_factory, faker, geant_partner):
def subscription_create(
description=None,
start_date="2023-05-24T00:00:00+00:00",
super_pop_switch_fqdn=None,
super_pop_switch_ts_port=None,
super_pop_switch_mgmt_ipv4_address=None,
super_pop_switch_site=None,
status: SubscriptionLifecycle | None = None,
partner: dict | None = None,
) -> UUIDstr:
if partner is None:
partner = geant_partner
description = description or faker.text(max_nb_chars=30)
super_pop_switch_fqdn = super_pop_switch_fqdn or faker.domain_name(levels=4)
super_pop_switch_ts_port = super_pop_switch_ts_port or faker.random_int(min=1, max=49151)
super_pop_switch_mgmt_ipv4_address = super_pop_switch_mgmt_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
super_pop_switch_site = super_pop_switch_site or site_subscription_factory()
product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH)
super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id(
product_id, customer_id=partner["partner_id"], insync=True
)
super_pop_switch_subscription.super_pop_switch.super_pop_switch_fqdn = super_pop_switch_fqdn
super_pop_switch_subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port
super_pop_switch_subscription.super_pop_switch.super_pop_switch_mgmt_ipv4_address = (
super_pop_switch_mgmt_ipv4_address
)
super_pop_switch_subscription.super_pop_switch.super_pop_switch_site = Site.from_subscription(
super_pop_switch_site
).site
super_pop_switch_subscription.super_pop_switch.vendor = Vendor.NOKIA
super_pop_switch_subscription = SubscriptionModel.from_other_lifecycle(
super_pop_switch_subscription, SubscriptionLifecycle.ACTIVE
)
super_pop_switch_subscription.description = description
super_pop_switch_subscription.start_date = start_date
if status:
super_pop_switch_subscription.status = status
super_pop_switch_subscription.save()
db.session.commit()
return str(super_pop_switch_subscription.subscription_id)
return subscription_create