Skip to content
Snippets Groups Projects

Feature/add ping and dig to IPAM steps

Merged Karel van Klink requested to merge feature/add-ping-to-ipam-steps into develop
3 files
+ 30
2
Compare changes
  • Side-by-side
  • Inline
Files
3
"""A creation workflow that deploys a new IP trunk service."""
import json
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Annotated
from uuid import uuid4
@@ -9,10 +10,12 @@ from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, conditional, done, step, workflow
from orchestrator.workflow import StepList, begin, conditional, done, step, step_group, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from ping3 import ping
from pydantic import AfterValidator, ConfigDict, field_validator
from pydantic_forms.validators import ReadOnlyField, validate_unique_list
from pynetbox.models.dcim import Interfaces
@@ -23,13 +26,14 @@ from gso.products.product_blocks.iptrunk import (
IptrunkType,
PhysicalPortCapacity,
)
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.iptrunk import Iptrunk, IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, subscriptions
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_name
from gso.services.sharepoint import SharePointClient
from gso.services.subscriptions import get_non_terminated_iptrunk_subscriptions
from gso.settings import load_oss_params
from gso.utils.helpers import (
LAGMember,
@@ -210,16 +214,94 @@ def create_subscription(product: UUIDstr, partner: str) -> State:
@step("Get information from IPAM")
def get_info_from_ipam(subscription: IptrunkInactive) -> State:
"""Allocate IP resources in :term:`IPAM`."""
subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network(
new_ipv4_network = infoblox.allocate_v4_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
subscription.iptrunk.iptrunk_ipv6_network = infoblox.allocate_v6_network(
new_ipv6_network = infoblox.allocate_v6_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
subscription.iptrunk.iptrunk_ipv4_network = new_ipv4_network
subscription.iptrunk.iptrunk_ipv6_network = new_ipv6_network
return {"subscription": subscription}
return {
"subscription": subscription,
"new_ipv4_network": str(new_ipv4_network),
"new_ipv6_network": str(new_ipv6_network),
}
@step("Check if assigned networks are already taken by other trunk subscription")
def check_existing_trunk_allocations(subscription: IptrunkInactive) -> None:
"""Check if there already is a trunk with the same network resources assigned to it."""
if not subscription.iptrunk.iptrunk_ipv4_network or not subscription.iptrunk.iptrunk_ipv6_network:
msg = "Missing IP resources in subscription object."
raise ProcessFailureError(
msg, details=[subscription.iptrunk.iptrunk_ipv4_network, subscription.iptrunk.iptrunk_ipv6_network]
)
all_trunks = [
Iptrunk.from_subscription(trunk["subscription_id"])
for trunk in get_non_terminated_iptrunk_subscriptions()
if trunk["subscription_id"] != subscription.subscription_id
]
overlapping_ipv4_networks = [
(trunk.description, trunk.iptrunk.iptrunk_ipv4_network)
for trunk in all_trunks
if trunk.iptrunk.iptrunk_ipv4_network.overlaps(subscription.iptrunk.iptrunk_ipv4_network)
]
overlapping_ipv6_networks = [
(trunk.description, trunk.iptrunk.iptrunk_ipv6_network)
for trunk in all_trunks
if trunk.iptrunk.iptrunk_ipv6_network.overlaps(subscription.iptrunk.iptrunk_ipv6_network)
]
if overlapping_ipv4_networks or overlapping_ipv6_networks:
msg = "Newly assigned IP networks overlap with existing IP trunk subscriptions, please investigate."
raise ProcessFailureError(msg, details=[overlapping_ipv4_networks, overlapping_ipv6_networks])
@step("Check for existing DNS records in the assigned IPv4 network")
def dig_all_hosts_v4(new_ipv4_network: str) -> None:
"""Check if any hosts have already been assigned inside the IPv4 network in Netbox."""
registered_hosts = [host for host in IPv4Network(new_ipv4_network) if infoblox.find_host_by_ip(IPv4Address(host))]
if registered_hosts:
msg = "One or more hosts in the assigned IPv4 network are already registered, please investigate."
raise ProcessFailureError(msg, details=registered_hosts)
@step("Check for existing DNS records in the assigned IPv6 network")
def dig_all_hosts_v6(new_ipv6_network: str) -> None:
"""Check if any hosts have already been assigned inside the IPv6 network in Netbox."""
registered_hosts = [host for host in IPv6Network(new_ipv6_network) if infoblox.find_host_by_ip(IPv6Address(host))]
if registered_hosts:
msg = "One or more hosts in the assigned IPv6 network are already registered, please investigate."
raise ProcessFailureError(msg, details=registered_hosts)
@step("Ping all hosts in the assigned IPv4 network")
def ping_all_hosts_v4(new_ipv4_network: str) -> None:
"""Ping all hosts in the IPv4 network to verify they're not in use."""
unavailable_hosts = [host for host in IPv4Network(new_ipv4_network) if ping(str(host), timeout=1)]
if unavailable_hosts:
msg = "One or more hosts in the assigned IPv4 network are responding to ping, please investigate."
raise ProcessFailureError(msg, details=unavailable_hosts)
@step("Ping all hosts in the assigned IPv6 network")
def ping_all_hosts_v6(new_ipv6_network: str) -> State:
"""Ping all hosts in the IPv6 network to verify they're not in use."""
unavailable_hosts = [host for host in IPv6Network(new_ipv6_network) if ping(str(host), timeout=1)]
if unavailable_hosts:
msg = "One or more hosts in the assigned IPv6 network are responding to ping, please investigate."
raise ProcessFailureError(msg, details=unavailable_hosts)
return {"__remove_keys": ["new_ipv4_network", "new_ipv6_network"]}
@step("Initialize subscription")
@@ -527,12 +609,25 @@ def create_iptrunk() -> StepList:
side_a_is_nokia = conditional(lambda state: get_router_vendor(state["side_a_node_id"]) == Vendor.NOKIA)
side_b_is_nokia = conditional(lambda state: get_router_vendor(state["side_b_node_id"]) == Vendor.NOKIA)
assign_ip_networks = step_group(
name="Assign IP networks",
steps=(
begin
>> get_info_from_ipam
>> check_existing_trunk_allocations
>> dig_all_hosts_v4
>> dig_all_hosts_v6
>> ping_all_hosts_v4
>> ping_all_hosts_v6
),
)
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> get_info_from_ipam
>> assign_ip_networks
>> reserve_interfaces_in_netbox
>> lso_interaction(provision_ip_trunk_iface_dry)
>> lso_interaction(provision_ip_trunk_iface_real)
Loading