Skip to content
Snippets Groups Projects
validate_iptrunk.py 5.71 KiB
"""Router validation workflow. Used in a nightly schedule."""

import json

from orchestrator.targets import Target
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from workflows.iptrunk.deploy_twamp import deploy_twamp_dry
from workflows.iptrunk.migrate_iptrunk import check_ip_trunk_isis
from workflows.iptrunk.modify_trunk_interface import provision_ip_trunk_iface_dry

from gso.products.product_types.iptrunk import Iptrunk
from gso.services import infoblox
from gso.services.lso_client import anonymous_lso_interaction, execute_playbook
from gso.utils.workflow_steps import detect_configuration_drift


@step("Validate IP trunk configuration")
def validate_router_config(subscription: Iptrunk, callback_route: str) -> None:
    """Run an Ansible playbook that validates the configuration that is present on an active IP trunk."""
    extra_vars = {"wfo_trunk_json": json.loads(json_dumps(subscription)), "verb": "validate"}

    execute_playbook(
        playbook_name="base_config.yaml",
        callback_route=callback_route,
        inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
        f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
        extra_vars=extra_vars,
    )


@step("Verify IPAM resources for LAG interfaces")
def verify_ipam_records(subscription: Iptrunk) -> None:
    """Validate the :term:`IPAM` resources for the :term:`LAG` interfaces.

    Raises an :class:`orchestrator.utils.errors.ProcessFailureError` if :term:`IPAM` is configured incorrectly.
    """
    ipam_errors = []
    ipam_v4_network = infoblox.find_network_by_cidr(subscription.iptrunk.iptrunk_ipv4_network)
    ipam_v6_network = infoblox.find_network_by_cidr(subscription.iptrunk.iptrunk_ipv6_network)

    if not ipam_v4_network or not ipam_v6_network:
        ipam_errors += [
            (
                "Missing IP trunk IPAM records, found the following instead.\n"
                f"IPv4 expected {subscription.iptrunk.iptrunk_ipv4_network}, actual: {ipam_v4_network.network}\n"
                f"IPv6 expected {subscription.iptrunk.iptrunk_ipv6_network}, actual: {ipam_v6_network.network}"
            )
        ]

    for index, side in enumerate(subscription.iptrunk.iptrunk_sides):
        lag_fqdn = f"{side.iptrunk_side_ae_iface}.{side.iptrunk_side_node.router_fqdn}"
        side_v4 = subscription.iptrunk.iptrunk_ipv4_network[index]
        side_v6 = subscription.iptrunk.iptrunk_ipv6_network[index + 1]
        #  Validate IPv4 address allocation
        record = infoblox.find_host_by_fqdn(lag_fqdn)
        if not record:
            ipam_errors += [f"No IPv4 host record found with FQDN {lag_fqdn}"]
        else:
            #  Allocation inside IPv4 network must be correct
            if str(side_v4) != record.ipv4addr:
                ipam_errors += [
                    (
                        f"Incorrectly allocated host record for FQDN {lag_fqdn}.\n"
                        f"Expected {side_v4}, actual: {record.ipv4addr}"
                    )
                ]

            #  Allocated host record needs to be set correctly
            if record.comment != subscription.subscription_id:
                ipam_errors += [
                    (
                        f"Incorrect host record found for {lag_fqdn} at {side_v4}. Comment should have been equal to"
                        f"subscription ID {subscription.subscription_id}."
                    )
                ]

        #  Validate IPv6 address allocation
        record = infoblox.find_v6_host_by_fqdn(lag_fqdn)
        if not record:
            ipam_errors += [f"No IPv6 host record found with FQDN {lag_fqdn}"]
        else:
            #  Allocation inside IPv6 network must be correct
            if str(side_v6) != record.ipv6addr:
                ipam_errors += [
                    (
                        f"Incorrectly allocated host record for FQDN {lag_fqdn}.\n"
                        f"Expected {side_v6}, actual: {record.ipv6addr}"
                    )
                ]

            #  Allocated host record needs to be set correctly
            if record.comment != subscription.subscription_id:
                ipam_errors += [
                    (
                        f"Incorrect host record found for {lag_fqdn} at {side_v6}. Comment should have been equal to"
                        f"subscription ID {subscription.subscription_id}."
                    )
                ]

    if ipam_errors:
        raise ProcessFailureError(message="IPAM misconfiguration(s) found", details=str(ipam_errors))


@step("Verify Netbox entries")
def verify_netbox_entries() -> None:
    """Validate required entries for an IP trunk in Netbox."""


@workflow(
    "Validate IP trunk configuration",
    target=Target.SYSTEM,
    initial_input_form=wrap_modify_initial_input_form(None),
)
def validate_iptrunk() -> StepList:
    """Validate an existing, active IP Trunk subscription.

    * Run an Ansible playbook to verify the configuration is intact.
    * Verify that the :term:`LAG` interfaces are correctly configured in :term:`IPAM`.
    """
    return (
        init
        >> store_process_subscription(Target.SYSTEM)
        >> unsync
        >> verify_ipam_records
        >> verify_netbox_entries
        >> anonymous_lso_interaction(provision_ip_trunk_iface_dry, detect_configuration_drift)
        >> anonymous_lso_interaction(check_ip_trunk_isis)
        >> anonymous_lso_interaction(deploy_twamp_dry, detect_configuration_drift)
        >> resync
        >> done
    )