Skip to content
Snippets Groups Projects

Draft: Add a step in router validation workflow to check if Kentik entry exists if it's a PE router.

1 file
+ 17
0
Compare changes
  • Side-by-side
  • Inline
@@ -14,6 +14,7 @@ from pydantic_forms.types import State, UUIDstr
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import infoblox, lso_client
from gso.services.kentik_client import KentikClient
from gso.services.librenms_client import LibreNMSClient
from gso.services.lso_client import anonymous_lso_interaction, execute_playbook
from gso.services.netbox_client import NetboxClient
@@ -84,6 +85,20 @@ def check_librenms_entry_exists(subscription: Router) -> None:
raise ProcessFailureError(message="LibreNMS configuration error", details=errors)
@step("Verify Kentik entry for PE router")
def check_kentik_entry_exists(subscription: Router) -> None:
"""Validate the Kentik entry for a PE Router.
Raises an HTTP error 404 when the device is not present in Kentik.
"""
client = KentikClient()
device = client.get_device_by_name(subscription.router.router_fqdn)
if not device:
raise ProcessFailureError(
message="Device not found in Kentik", details={"device": subscription.router.router_fqdn}
)
@step("Check base config for drift")
def verify_base_config(subscription: Router, callback_route: str) -> None:
"""Workflow step for running a playbook that checks whether base config has drifted."""
@@ -113,6 +128,7 @@ def validate_router() -> StepList:
* Validate configuration of the iBGP mesh
"""
is_juniper_router = conditional(lambda state: state["subscription"]["router"]["vendor"] == Vendor.JUNIPER)
is_pe_router = conditional(lambda state: state["subscription"]["router"]["router_role"] == RouterRole.PE)
return (
begin
@@ -123,6 +139,7 @@ def validate_router() -> StepList:
>> verify_ipam_loopback
>> check_netbox_entry_exists
>> check_librenms_entry_exists
>> is_pe_router(check_kentik_entry_exists)
>> anonymous_lso_interaction(verify_base_config)
>> anonymous_lso_interaction(verify_p_ibgp)
>> resync
Loading