Skip to content
Snippets Groups Projects

Feature/add ping and dig to IPAM steps

Merged Karel van Klink requested to merge feature/add-ping-to-ipam-steps into develop
3 files
+ 30
2
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -9,10 +9,12 @@ from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, conditional, done, step, workflow
from orchestrator.workflow import StepList, begin, conditional, done, step, step_group, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from ping3 import ping
from pydantic import AfterValidator, ConfigDict, field_validator
from pydantic_forms.validators import ReadOnlyField, validate_unique_list
from pynetbox.models.dcim import Interfaces
@@ -222,6 +224,26 @@ def get_info_from_ipam(subscription: IptrunkInactive) -> State:
return {"subscription": subscription}
@step("Ping all hosts in the assigned IPv4 network")
def ping_all_hosts_v4(subscription: IptrunkInactive) -> None:
"""Ping all hosts in the IPv4 network to verify they're not in use."""
unavailable_hosts = [host for host in subscription.iptrunk.iptrunk_ipv4_network if ping(host, timeout=1)]
if unavailable_hosts:
msg = "One or more hosts in the assigned IPv4 network are responding to ping, please investigate."
raise ProcessFailureError(msg, details=unavailable_hosts)
@step("Ping all hosts in the assigned IPv6 network")
def ping_all_hosts_v6(subscription: IptrunkInactive) -> None:
"""Ping all hosts in the IPv6 network to verify they're not in use."""
unavailable_hosts = [host for host in subscription.iptrunk.iptrunk_ipv6_network if ping(host, timeout=1)]
if unavailable_hosts:
msg = "One or more hosts in the assigned IPv6 network are responding to ping, please investigate."
raise ProcessFailureError(msg, details=unavailable_hosts)
@step("Initialize subscription")
def initialize_subscription(
subscription: IptrunkInactive,
@@ -527,12 +549,16 @@ def create_iptrunk() -> StepList:
side_a_is_nokia = conditional(lambda state: get_router_vendor(state["side_a_node_id"]) == Vendor.NOKIA)
side_b_is_nokia = conditional(lambda state: get_router_vendor(state["side_b_node_id"]) == Vendor.NOKIA)
assign_ip_networks = step_group(
name="Assign IP networks", steps=begin >> get_info_from_ipam >> ping_all_hosts_v4 >> ping_all_hosts_v6
)
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> get_info_from_ipam
>> assign_ip_networks
>> reserve_interfaces_in_netbox
>> lso_interaction(provision_ip_trunk_iface_dry)
>> lso_interaction(provision_ip_trunk_iface_real)
Loading