import ipaddress import pytest from orchestrator.db import db from orchestrator.domain import SubscriptionModel from orchestrator.types import SubscriptionLifecycle, UUIDstr from gso.products import ProductName from gso.products.product_blocks.iptrunk import ( IptrunkInterfaceBlock, IptrunkSideBlock, IptrunkType, PhyPortCapacity, ) from gso.products.product_blocks.router import RouterRole from gso.products.product_blocks.site import SiteTier from gso.products.product_types.iptrunk import IptrunkInactive from gso.products.product_types.office_router import OfficeRouterInactive from gso.products.product_types.router import Router, RouterInactive from gso.products.product_types.site import Site, SiteInactive from gso.products.product_types.super_pop_switch import SuperPopSwitchInactive from gso.services import subscriptions from gso.utils.shared_enums import Vendor @pytest.fixture() def site_subscription_factory(faker, geant_partner): def subscription_create( description=None, start_date="2023-05-24T00:00:00+00:00", site_name=None, site_city=None, site_country=None, site_country_code=None, site_latitude=None, site_longitude=None, site_bgp_community_id=None, site_internal_id=None, site_tier=SiteTier.TIER1, site_ts_address=None, status: SubscriptionLifecycle | None = None, partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner description = description or "Site Subscription" site_name = site_name or faker.domain_word() site_city = site_city or faker.city() site_country = site_country or faker.country() site_country_code = site_country_code or faker.country_code() site_latitude = site_latitude or float(faker.latitude()) site_longitude = site_longitude or float(faker.longitude()) site_bgp_community_id = site_bgp_community_id or faker.pyint() site_internal_id = site_internal_id or faker.pyint() site_ts_address = site_ts_address or faker.ipv4() product_id = subscriptions.get_product_id_by_name(ProductName.SITE) site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) site_subscription.site.site_city = site_city site_subscription.site.site_name = site_name site_subscription.site.site_country = site_country site_subscription.site.site_country_code = site_country_code site_subscription.site.site_latitude = site_latitude site_subscription.site.site_longitude = site_longitude site_subscription.site.site_bgp_community_id = site_bgp_community_id site_subscription.site.site_internal_id = site_internal_id site_subscription.site.site_tier = site_tier site_subscription.site.site_ts_address = site_ts_address site_subscription = SubscriptionModel.from_other_lifecycle(site_subscription, SubscriptionLifecycle.ACTIVE) site_subscription.description = description site_subscription.start_date = start_date if status: site_subscription.status = status site_subscription.save() db.session.commit() return str(site_subscription.subscription_id) return subscription_create @pytest.fixture() def nokia_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( description=None, start_date="2023-05-24T00:00:00+00:00", router_fqdn=None, router_ts_port=None, router_access_via_ts=None, router_lo_ipv4_address=None, router_lo_ipv6_address=None, router_lo_iso_address=None, router_role=RouterRole.PE, router_site=None, status: SubscriptionLifecycle | None = None, partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner description = description or faker.text(max_nb_chars=30) router_fqdn = router_fqdn or faker.domain_name(levels=4) router_ts_port = router_ts_port or faker.random_int(min=1, max=49151) router_access_via_ts = router_access_via_ts or faker.boolean() router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6()) router_lo_iso_address = router_lo_iso_address or faker.word() router_site = router_site or site_subscription_factory() product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) router_subscription = RouterInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) router_subscription.router.router_fqdn = router_fqdn router_subscription.router.router_ts_port = router_ts_port router_subscription.router.router_access_via_ts = router_access_via_ts router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address router_subscription.router.router_lo_iso_address = router_lo_iso_address router_subscription.router.router_role = router_role router_subscription.router.router_site = Site.from_subscription(router_site).site router_subscription.router.vendor = Vendor.NOKIA router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE) router_subscription.description = description router_subscription.start_date = start_date if status: router_subscription.status = status router_subscription.save() db.session.commit() return str(router_subscription.subscription_id) return subscription_create @pytest.fixture() def juniper_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( description=None, start_date="2023-05-24T00:00:00+00:00", router_fqdn=None, router_ts_port=None, router_access_via_ts=None, router_lo_ipv4_address=None, router_lo_ipv6_address=None, router_lo_iso_address=None, router_role=RouterRole.PE, router_site=None, status: SubscriptionLifecycle | None = None, partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner description = description or faker.text(max_nb_chars=30) router_fqdn = router_fqdn or faker.domain_name(levels=4) router_ts_port = router_ts_port or faker.random_int(min=1, max=49151) router_access_via_ts = router_access_via_ts or faker.boolean() router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6()) router_lo_iso_address = router_lo_iso_address or faker.word() router_site = router_site or site_subscription_factory() product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) router_subscription = RouterInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) router_subscription.router.router_fqdn = router_fqdn router_subscription.router.router_ts_port = router_ts_port router_subscription.router.router_access_via_ts = router_access_via_ts router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address router_subscription.router.router_lo_iso_address = router_lo_iso_address router_subscription.router.router_role = router_role router_subscription.router.router_site = Site.from_subscription(router_site).site router_subscription.router.vendor = Vendor.JUNIPER router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE) router_subscription.description = description router_subscription.start_date = start_date if status: router_subscription.status = status router_subscription.save() db.session.commit() return str(router_subscription.subscription_id) return subscription_create @pytest.fixture() def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker): def subscription_create( iptrunk_side_node=None, iptrunk_side_ae_iface=None, iptrunk_side_ae_geant_a_sid=None, iptrunk_side_ae_members=None, iptrunk_side_ae_members_description=None, ) -> IptrunkSideBlock: iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory() iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router iptrunk_side_ae_iface = iptrunk_side_ae_iface or faker.pystr() iptrunk_side_ae_geant_a_sid = iptrunk_side_ae_geant_a_sid or faker.geant_sid() iptrunk_side_ae_members = iptrunk_side_ae_members or [ IptrunkInterfaceBlock.new( faker.uuid4(), interface_name=faker.network_interface(), interface_description=faker.sentence(), ), IptrunkInterfaceBlock.new( faker.uuid4(), interface_name=faker.network_interface(), interface_description=faker.sentence(), ), ] return IptrunkSideBlock.new( faker.uuid4(), iptrunk_side_node=iptrunk_side_node, iptrunk_side_ae_iface=iptrunk_side_ae_iface, iptrunk_side_ae_geant_a_sid=iptrunk_side_ae_geant_a_sid, iptrunk_side_ae_members=iptrunk_side_ae_members, iptrunk_side_ae_members_description=iptrunk_side_ae_members_description, ) return subscription_create @pytest.fixture() def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant_partner): def subscription_create( description=None, start_date="2023-05-24T00:00:00+00:00", geant_s_sid=None, iptrunk_description=None, iptrunk_type=IptrunkType.DARK_FIBER, iptrunk_speed=PhyPortCapacity.ONE_GIGABIT_PER_SECOND, iptrunk_isis_metric=None, iptrunk_ipv4_network=None, iptrunk_ipv6_network=None, iptrunk_sides=None, status: SubscriptionLifecycle | None = None, partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK) description = description or faker.sentence() geant_s_sid = geant_s_sid or faker.geant_sid() iptrunk_description = iptrunk_description or faker.sentence() iptrunk_isis_metric = iptrunk_isis_metric or faker.pyint() iptrunk_ipv4_network = iptrunk_ipv4_network or faker.ipv4_network(max_subnet=31) iptrunk_ipv6_network = iptrunk_ipv6_network or faker.ipv6_network(max_subnet=126) iptrunk_minimum_links = 1 iptrunk_side_a = iptrunk_side_subscription_factory() iptrunk_side_b = iptrunk_side_subscription_factory() iptrunk_sides = iptrunk_sides or [iptrunk_side_a, iptrunk_side_b] iptrunk_subscription = IptrunkInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) iptrunk_subscription.iptrunk.geant_s_sid = geant_s_sid iptrunk_subscription.iptrunk.iptrunk_description = iptrunk_description iptrunk_subscription.iptrunk.iptrunk_type = iptrunk_type iptrunk_subscription.iptrunk.iptrunk_speed = iptrunk_speed iptrunk_subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links iptrunk_subscription.iptrunk.iptrunk_isis_metric = iptrunk_isis_metric iptrunk_subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network iptrunk_subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network iptrunk_subscription.iptrunk.iptrunk_sides = iptrunk_sides iptrunk_subscription = SubscriptionModel.from_other_lifecycle( iptrunk_subscription, SubscriptionLifecycle.ACTIVE, ) if status: iptrunk_subscription.status = status iptrunk_subscription.description = description iptrunk_subscription.start_date = start_date iptrunk_subscription.save() db.session.commit() return str(iptrunk_subscription.subscription_id) return subscription_create @pytest.fixture() def office_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( description=None, start_date="2023-05-24T00:00:00+00:00", office_router_fqdn=None, office_router_ts_port=None, office_router_lo_ipv4_address=None, office_router_lo_ipv6_address=None, office_router_site=None, status: SubscriptionLifecycle | None = None, partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner description = description or faker.text(max_nb_chars=30) office_router_fqdn = office_router_fqdn or faker.domain_name(levels=4) office_router_ts_port = office_router_ts_port or faker.random_int(min=1, max=49151) office_router_lo_ipv4_address = office_router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) office_router_lo_ipv6_address = office_router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6()) office_router_site = office_router_site or site_subscription_factory() product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER) office_router_subscription = OfficeRouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) office_router_subscription.office_router.office_router_fqdn = office_router_fqdn office_router_subscription.office_router.office_router_ts_port = office_router_ts_port office_router_subscription.office_router.office_router_lo_ipv4_address = office_router_lo_ipv4_address office_router_subscription.office_router.office_router_lo_ipv6_address = office_router_lo_ipv6_address office_router_subscription.office_router.office_router_site = Site.from_subscription(office_router_site).site office_router_subscription.office_router.vendor = Vendor.NOKIA office_router_subscription = SubscriptionModel.from_other_lifecycle( office_router_subscription, SubscriptionLifecycle.ACTIVE ) office_router_subscription.description = description office_router_subscription.start_date = start_date if status: office_router_subscription.status = status office_router_subscription.save() db.session.commit() return str(office_router_subscription.subscription_id) return subscription_create @pytest.fixture() def super_pop_switch_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( description=None, start_date="2023-05-24T00:00:00+00:00", super_pop_switch_fqdn=None, super_pop_switch_ts_port=None, super_pop_switch_mgmt_ipv4_address=None, super_pop_switch_site=None, status: SubscriptionLifecycle | None = None, partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner description = description or faker.text(max_nb_chars=30) super_pop_switch_fqdn = super_pop_switch_fqdn or faker.domain_name(levels=4) super_pop_switch_ts_port = super_pop_switch_ts_port or faker.random_int(min=1, max=49151) super_pop_switch_mgmt_ipv4_address = super_pop_switch_mgmt_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) super_pop_switch_site = super_pop_switch_site or site_subscription_factory() product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH) super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) super_pop_switch_subscription.super_pop_switch.super_pop_switch_fqdn = super_pop_switch_fqdn super_pop_switch_subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port super_pop_switch_subscription.super_pop_switch.super_pop_switch_mgmt_ipv4_address = ( super_pop_switch_mgmt_ipv4_address ) super_pop_switch_subscription.super_pop_switch.super_pop_switch_site = Site.from_subscription( super_pop_switch_site ).site super_pop_switch_subscription.super_pop_switch.vendor = Vendor.NOKIA super_pop_switch_subscription = SubscriptionModel.from_other_lifecycle( super_pop_switch_subscription, SubscriptionLifecycle.ACTIVE ) super_pop_switch_subscription.description = description super_pop_switch_subscription.start_date = start_date if status: super_pop_switch_subscription.status = status super_pop_switch_subscription.save() db.session.commit() return str(super_pop_switch_subscription.subscription_id) return subscription_create