Skip to content
Snippets Groups Projects
netbox_client.py 12.66 KiB
"""Contain all methods to communicate with the NetBox API endpoint. Data Center Infrastructure Main (DCIM)."""

from contextlib import suppress
from uuid import UUID

import pydantic
import pynetbox
from orchestrator.types import UUIDstr
from pynetbox.models.dcim import Devices, DeviceTypes, Interfaces

from gso.products.product_types.router import Router
from gso.settings import load_oss_params
from gso.utils.device_info import (
    DEFAULT_SITE,
    FEASIBLE_IP_TRUNK_LAG_RANGE,
    FEASIBLE_SERVICES_LAG_RANGE,
    ROUTER_ROLE,
    TierInfo,
)
from gso.utils.exceptions import NotFoundError, WorkflowStateError


class Manufacturer(pydantic.BaseModel):
    """Defines the manufacturer of a device."""

    name: str
    slug: str


class DeviceType(pydantic.BaseModel):
    """Defines the device type.

    The manufacturer should be created first to get the manufacturer id, which is defined here as int.
    """

    manufacturer: int
    model: str
    slug: str


class DeviceRole(pydantic.BaseModel):
    """Defines the role of a device."""

    name: str
    slug: str


class Site(pydantic.BaseModel):
    """Defines the site of a device."""

    name: str
    slug: str


class NetboxClient:
    """Implement all methods to communicate with the Netbox API."""

    def __init__(self) -> None:
        """Instantiate a new Netbox client."""
        self.netbox_params = load_oss_params().NETBOX
        self.netbox = pynetbox.api(self.netbox_params.api, self.netbox_params.token)

    def get_all_devices(self) -> list[Devices]:
        """Get all devices in Netbox."""
        return list(self.netbox.dcim.devices.all())

    def get_allocated_interfaces_by_gso_subscription(self, device_name: str, subscription_id: UUID) -> list[Interfaces]:
        """Return all allocated interfaces of a device by name."""
        device = self.get_device_by_name(device_name)
        return self.netbox.dcim.interfaces.filter(
            device_id=device.id,
            enabled=True,
            mark_connected=True,
            description=subscription_id,
        )

    def get_device_by_name(self, device_name: str) -> Devices:
        """Return the device object by name from netbox, or raise not found."""
        device = self.netbox.dcim.devices.get(name=device_name)
        if device is None:
            msg = f"Device: {device_name} not found."
            raise NotFoundError(msg)
        return device

    def get_interface_by_name_and_device(self, iface_name: str, device_name: str) -> Interfaces:
        """Return the interface lists by name and device name from netbox."""
        device = self.get_device_by_name(device_name)
        interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
        if interface is None:
            msg = f"Interface: {iface_name} on device with id: {device.id} not found."
            raise NotFoundError(msg)
        return interface

    def get_interfaces_by_device(self, device_name: str, speed: str) -> list[Interfaces]:
        """Get all interfaces of a device by name and speed that are not reserved and not allocated."""
        device = self.get_device_by_name(device_name)
        return list(
            self.netbox.dcim.interfaces.filter(device_id=device.id, enabled=False, mark_connected=False, speed=speed),
        )

    def create_interface(
        self,
        iface_name: str,
        interface_type: str,
        device_name: str,
        description: str | None = None,
        *,
        enabled: bool = False,
    ) -> Interfaces:
        """Create new interface on a device, where device is defined by name.

        The type parameter can be 1000base-t, 10gbase-t, LAG, etc.
        For more details on type definition have  a look in choices.py in the netbox API implementation in module DCIM.

        Returns:
        the new interface object as dict.
        """
        device = self.get_device_by_name(device_name)

        return self.netbox.dcim.interfaces.create(
            name=iface_name,
            type=interface_type,
            enabled=enabled,
            mark_connected=False,
            device=device.id,
            description=description,
        )

    def delete_interface(self, device_name: str, iface_name: str) -> None:
        """Delete an interface from a device by name."""
        interface = self.get_interface_by_name_and_device(iface_name, device_name)
        if interface:
            return interface.delete()
        return None

    def create_device_type(self, manufacturer: str, model: str, slug: str) -> DeviceTypes:
        """Create a new device type in Netbox."""
        # First get manufacturer id
        manufacturer_id = int(self.netbox.dcim.manufacturers.get(name=manufacturer).id)
        device_type = DeviceType(manufacturer=manufacturer_id, model=model, slug=slug)
        return self.netbox.dcim.device_types.create(dict(device_type))

    def create_device_role(self, name: str, slug: str) -> DeviceRole:
        """Create a new device role."""
        device_role = DeviceRole(name=name, slug=slug)
        return self.netbox.dcim.device_roles.create(dict(device_role))

    def create_device_site(self, name: str, slug: str) -> Site:
        """Create a new site for devices."""
        device_site = Site(name=name, slug=slug)
        return self.netbox.dcim.sites.create(dict(device_site))

    def create_device_manufacturer(self, name: str, slug: str) -> Manufacturer:
        """Create a new device manufacturer."""
        device_manufacturer = Manufacturer(name=name, slug=slug)
        return self.netbox.dcim.manufacturers.create(dict(device_manufacturer))

    @staticmethod
    def calculate_interface_speed(interface: Interfaces) -> int | None:
        """Calculate the interface speed in bits per second."""
        type_parts = interface.type.value.split("-")
        if "gbase" in type_parts[0]:
            return int("".join(filter(str.isdigit, type_parts[0]))) * 1000000
        return None

    def create_device(self, device_name: str, site_tier: str) -> Devices:
        """Create a new device in Netbox."""
        # Get device type id
        tier_info = TierInfo().get_module_by_name(f"tier{site_tier}")
        device_type = self.netbox.dcim.device_types.get(model=tier_info.device_type)

        # Get device role id
        device_role = self.netbox.dcim.device_roles.get(name=ROUTER_ROLE["name"])

        # Get site id
        device_site = self.netbox.dcim.sites.get(name=DEFAULT_SITE["name"])

        # Create new device
        device = self.netbox.dcim.devices.create(
            name=device_name,
            device_type=device_type.id,
            role=device_role.id,
            site=device_site.id,
        )
        module_bays = list(self.netbox.dcim.module_bays.filter(device_id=device.id))
        card_type = self.netbox.dcim.module_types.get(model=tier_info.module_type)
        valid_module_bays = [bay for bay in module_bays if int(bay.position) in tier_info.module_bays_slots]
        for module_bay in valid_module_bays:
            self.netbox.dcim.modules.create(
                device=device.id,
                module_bay=module_bay.id,
                module_type=card_type.id,
                status="active",
                enabled=False,
                comments="Installed via pynetbox",
            )

        for interface in self.netbox.dcim.interfaces.filter(device_id=device.id):
            interface.speed = self.calculate_interface_speed(interface)
            interface.enabled = False
            interface.save()

        return device

    def delete_device(self, device_name: str) -> None:
        """Delete device by name if exists."""
        with suppress(AttributeError):
            self.netbox.dcim.devices.get(name=device_name).delete()

    def attach_interface_to_lag(
        self,
        device_name: str,
        lag_name: str,
        iface_name: str,
        description: str | None = None,
    ) -> Interfaces:
        """Assign a given interface to a LAG.

        Returns:
        the interface object after assignment.
        """
        iface = self.get_interface_by_name_and_device(iface_name, device_name)

        # Get LAG
        lag = self.get_interface_by_name_and_device(lag_name, device_name)

        # Assign interface to LAG, ensuring it does not already belong to a LAG.
        if iface.lag:
            msg = f"The interface: {iface_name} on device: {device_name} already belongs to a LAG: {iface.lag.name}."
            raise WorkflowStateError(msg)
        iface.lag = lag.id

        # Set description if provided
        if description:
            iface.description = description

        iface.save()
        return iface

    def reserve_interface(self, device_name: str, iface_name: str) -> Interfaces:
        """Reserve an interface by enabling it."""
        # First get interface from device
        interface = self.get_interface_by_name_and_device(iface_name, device_name)

        # Check if interface is reserved
        if interface.enabled:
            msg = f"The interface: {iface_name} on device: {device_name} is already reserved."
            raise WorkflowStateError(msg)

        # Reserve interface by enabling it
        interface.enabled = True
        interface.save()

        return interface

    def allocate_interface(self, device_name: str, iface_name: str) -> Interfaces:
        """Allocate an interface by marking it as connected."""
        # First get interface from device
        interface = self.get_interface_by_name_and_device(iface_name, device_name)

        # Check if interface is reserved
        if interface.mark_connected:
            msg = f"The interface: {iface_name} on device: {device_name} is already allocated."
            raise WorkflowStateError(msg)

        # Allocate interface by marking it as connected
        interface.mark_connected = True
        interface.save()

        return interface

    def free_interface(self, device_name: str, iface_name: str) -> Interfaces:
        """Free interface by marking disconnect and disable it."""
        # First get interface from device
        interface = self.get_interface_by_name_and_device(iface_name, device_name)
        interface.mark_connected = False
        interface.enabled = False
        interface.description = ""
        interface.save()

        return interface

    def detach_interfaces_from_lag(self, device_name: str, lag_name: str) -> None:
        """Detach all interfaces from a LAG."""
        device = self.get_device_by_name(device_name)
        lag = self.netbox.dcim.interfaces.get(device_id=device.id, name=lag_name)
        for interface in self.netbox.dcim.interfaces.filter(
            device_id=device.id,
            lag_id=lag.id,
            enabled=False,
            mark_connected=False,
        ):
            interface.lag = None
            interface.save()

    def get_available_lags_in_range(self, router_id: UUID, lag_range: range) -> list[str]:
        """Return all available LAGs within a given range not assigned to a device."""
        router_name = Router.from_subscription(router_id).router.router_fqdn
        device = self.get_device_by_name(router_name)

        # Get the existing LAG interfaces for the device
        lag_interface_names = [
            interface["name"] for interface in self.netbox.dcim.interfaces.filter(device=device.name, type="lag")
        ]

        # Generate all feasible LAGs in the specified range
        all_feasible_lags = [f"lag-{i}" for i in lag_range]

        # Return available LAGs not assigned to the device
        return [lag for lag in all_feasible_lags if lag not in lag_interface_names]

    def get_available_lags(self, router_id: UUID) -> list[str]:
        """Return all available LAG not assigned to a device."""
        return self.get_available_lags_in_range(router_id, FEASIBLE_IP_TRUNK_LAG_RANGE)

    def get_available_services_lags(self, router_id: UUID) -> list[str]:
        """Return all available Edge port LAGs not assigned to a device."""
        return self.get_available_lags_in_range(router_id, FEASIBLE_SERVICES_LAG_RANGE)

    @staticmethod
    def calculate_speed_bits_per_sec(speed: str) -> int:
        """Calculate the numeric part from the speed."""
        numeric_part = int("".join(filter(str.isdigit, speed)))
        # Convert to bits per second
        return numeric_part * 1000000

    def get_available_interfaces(self, router_id: UUID | UUIDstr, speed: str) -> Interfaces:
        """Return all available interfaces of a device filtered by speed."""
        router = Router.from_subscription(router_id).router.router_fqdn
        device = self.get_device_by_name(router)
        speed_bps = self.calculate_speed_bits_per_sec(speed)
        return self.netbox.dcim.interfaces.filter(
            device=device.name,
            enabled=False,
            mark_connected=False,
            speed=speed_bps,
        )