Skip to content
Snippets Groups Projects
Verified Commit 4285b1e1 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Check if other IP trunk subscriptions are already overlapping with assigned IP resources

parent 328ac9be
No related branches found
No related tags found
1 merge request!252Feature/add ping and dig to IPAM steps
Pipeline #88618 passed
......@@ -136,6 +136,19 @@ def get_active_iptrunk_subscriptions(includes: list[str] | None = None) -> list[
)
def get_non_terminated_iptrunk_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve all IP trunk subscriptions that are not terminated.
:param list[Subscription] includes: Fields to be included in the returned Subscription objects.
:return list[Subscription]: A list of IP trunk subscriptions.
"""
return get_subscriptions(
product_types=[ProductType.IP_TRUNK],
lifecycles=[SubscriptionLifecycle.INITIAL, SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE],
includes=includes,
)
def get_trunks_that_terminate_on_router(
subscription_id: UUIDstr, lifecycle_state: SubscriptionLifecycle
) -> list[SubscriptionTable]:
......
......@@ -26,13 +26,14 @@ from gso.products.product_blocks.iptrunk import (
IptrunkType,
PhysicalPortCapacity,
)
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.iptrunk import Iptrunk, IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, subscriptions
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_name
from gso.services.sharepoint import SharePointClient
from gso.services.subscriptions import get_non_terminated_iptrunk_subscriptions
from gso.settings import load_oss_params
from gso.utils.helpers import (
LAGMember,
......@@ -231,6 +232,36 @@ def get_info_from_ipam(subscription: IptrunkInactive) -> State:
}
@step("Check if assigned networks are already taken by other trunk subscription")
def check_existing_trunk_allocations(subscription: IptrunkInactive) -> None:
"""Check if there already is a trunk with the same network resources assigned to it."""
if not subscription.iptrunk.iptrunk_ipv4_network or not subscription.iptrunk.iptrunk_ipv6_network:
msg = "Missing IP resources in subscription object."
raise ProcessFailureError(
msg, details=[subscription.iptrunk.iptrunk_ipv4_network, subscription.iptrunk.iptrunk_ipv6_network]
)
all_trunks = [
Iptrunk.from_subscription(trunk["subscription_id"])
for trunk in get_non_terminated_iptrunk_subscriptions()
if trunk["subscription_id"] != subscription.subscription_id
]
overlapping_ipv4_networks = [
(trunk.description, trunk.iptrunk.iptrunk_ipv4_network)
for trunk in all_trunks
if trunk.iptrunk.iptrunk_ipv4_network.overlaps(subscription.iptrunk.iptrunk_ipv4_network)
]
overlapping_ipv6_networks = [
(trunk.description, trunk.iptrunk.iptrunk_ipv6_network)
for trunk in all_trunks
if trunk.iptrunk.iptrunk_ipv6_network.overlaps(subscription.iptrunk.iptrunk_ipv6_network)
]
if overlapping_ipv4_networks or overlapping_ipv6_networks:
msg = "Newly assigned IP networks overlap with existing IP trunk subscriptions, please investigate."
raise ProcessFailureError(msg, details=[overlapping_ipv4_networks, overlapping_ipv6_networks])
@step("Check for existing DNS records in the assigned IPv4 network")
def dig_all_hosts_v4(new_ipv4_network: str) -> None:
"""Check if any hosts have already been assigned inside the IPv4 network in Netbox."""
......@@ -583,6 +614,7 @@ def create_iptrunk() -> StepList:
steps=(
begin
>> get_info_from_ipam
>> check_existing_trunk_allocations
>> dig_all_hosts_v4
>> dig_all_hosts_v6
>> ping_all_hosts_v4
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment