Skip to content
Snippets Groups Projects
netbox_client.py 12.07 KiB
"""Contain all methods to communicate with the NetBox API endpoint. Data Center Infrastructure Main (DCIM)."""

from uuid import UUID

import pydantic
import pynetbox
from pydantic_forms.types import UUIDstr
from pynetbox.models.dcim import Devices, DeviceTypes, Interfaces

from gso.products.product_types.router import Router
from gso.settings import load_oss_params
from gso.utils.device_info import (
    DEFAULT_SITE,
    FEASIBLE_IP_TRUNK_LAG_RANGE,
    ROUTER_ROLE,
    TierInfo,
)
from gso.utils.exceptions import NotFoundError, WorkflowStateError


class Manufacturer(pydantic.BaseModel):
    """Defines the manufacturer of a device."""

    name: str
    slug: str


class DeviceType(pydantic.BaseModel):
    """Defines the device type.

    The manufacturer should be created first to get the manufacturer id, which is defined here as int.
    """

    manufacturer: int
    model: str
    slug: str


class DeviceRole(pydantic.BaseModel):
    """Defines the role of a device."""

    name: str
    slug: str


class Site(pydantic.BaseModel):
    """Defines the site of a device."""

    name: str
    slug: str


class NetboxClient:
    """Implement all methods to communicate with the Netbox :term:`API`."""

    def __init__(self) -> None:
        """Instantiate a new Netbox client."""
        self.netbox_params = load_oss_params().NETBOX
        self.netbox = pynetbox.api(self.netbox_params.api, self.netbox_params.token)

    def get_all_devices(self) -> list[Devices]:
        """Get all devices in Netbox."""
        return list(self.netbox.dcim.devices.all())

    def get_allocated_interfaces_by_gso_subscription(self, device_name: str, subscription_id: UUID) -> list[Interfaces]:
        """Return all allocated interfaces of a device by name."""
        device = self.get_device_by_name(device_name)
        return self.netbox.dcim.interfaces.filter(
            device_id=device.id,
            enabled=True,
            mark_connected=True,
            description=subscription_id,
        )

    def get_device_by_name(self, device_name: str) -> Devices:
        """Return the device object by name from netbox, or raise not found."""
        device = self.netbox.dcim.devices.get(name=device_name)
        if device is None:
            msg = f"Device: {device_name} not found."
            raise NotFoundError(msg)
        return device

    def get_interface_by_name_and_device(self, iface_name: str, device_name: str) -> Interfaces:
        """Return the interface lists by name and device name from netbox."""
        device = self.get_device_by_name(device_name)
        interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
        if interface is None:
            msg = f"Interface: {iface_name} on device with id: {device.id} not found."
            raise NotFoundError(msg)
        return interface

    def get_interfaces_by_device(self, device_name: str, speed: str) -> list[Interfaces]:
        """Get all interfaces of a device by name and speed that are not reserved and not allocated."""
        device = self.get_device_by_name(device_name)
        return list(
            self.netbox.dcim.interfaces.filter(device_id=device.id, enabled=False, mark_connected=False, speed=speed),
        )

    def create_interface(
        self,
        iface_name: str,
        interface_type: str,
        device_name: str,
        description: str | None = None,
        *,
        enabled: bool = False,
    ) -> Interfaces:
        """Create new interface on a device, where device is defined by name.

        The type parameter can be 1000base-t, 10gbase-t, lag, etc.
        For more details on type definition have  a look in choices.py in the netbox API implementation in module DCIM.
        Returns the new interface object as dict.
        """
        device = self.get_device_by_name(device_name)

        return self.netbox.dcim.interfaces.create(
            name=iface_name,
            type=interface_type,
            enabled=enabled,
            mark_connected=False,
            device=device.id,
            description=description,
        )

    def delete_interface(self, device_name: str, iface_name: str) -> None:
        """Delete an interface from a device by name."""
        interface = self.get_interface_by_name_and_device(iface_name, device_name)
        return interface.delete()

    def create_device_type(self, manufacturer: str, model: str, slug: str) -> DeviceTypes:
        """Create a new device type in Netbox."""
        # First get manufacturer id
        manufacturer_id = int(self.netbox.dcim.manufacturers.get(name=manufacturer).id)
        device_type = DeviceType(manufacturer=manufacturer_id, model=model, slug=slug)
        return self.netbox.dcim.device_types.create(dict(device_type))

    def create_device_role(self, name: str, slug: str) -> DeviceRole:
        """Create a new device role."""
        device_role = DeviceRole(name=name, slug=slug)
        return self.netbox.dcim.device_roles.create(dict(device_role))

    def create_device_site(self, name: str, slug: str) -> Site:
        """Create a new site for devices."""
        device_site = Site(name=name, slug=slug)
        return self.netbox.dcim.sites.create(dict(device_site))

    def create_device_manufacturer(self, name: str, slug: str) -> Manufacturer:
        """Create a new device manufacturer."""
        device_manufacturer = Manufacturer(name=name, slug=slug)
        return self.netbox.dcim.manufacturers.create(dict(device_manufacturer))

    @staticmethod
    def calculate_interface_speed(interface: Interfaces) -> int | None:
        """Calculate the interface speed in bits per second."""
        type_parts = interface.type.value.split("-")
        if "gbase" in type_parts[0]:
            return int("".join(filter(str.isdigit, type_parts[0]))) * 1000000
        return None

    def create_device(self, device_name: str, site_tier: str) -> Devices:
        """Create a new device in Netbox."""
        # Get device type id
        tier_info = TierInfo().get_module_by_name(f"Tier{site_tier}")
        device_type = self.netbox.dcim.device_types.get(model=tier_info.device_type)

        # Get device role id
        device_role = self.netbox.dcim.device_roles.get(name=ROUTER_ROLE["name"])

        # Get site id
        device_site = self.netbox.dcim.sites.get(name=DEFAULT_SITE["name"])

        # Create new device
        device = self.netbox.dcim.devices.create(
            name=device_name,
            device_type=device_type.id,
            role=device_role.id,
            site=device_site.id,
        )
        module_bays = list(self.netbox.dcim.module_bays.filter(device_id=device.id))
        card_type = self.netbox.dcim.module_types.get(model=tier_info.module_type)
        valid_module_bays = [bay for bay in module_bays if int(bay.position) in tier_info.module_bays_slots]
        for module_bay in valid_module_bays:
            self.netbox.dcim.modules.create(
                device=device.id,
                module_bay=module_bay.id,
                module_type=card_type.id,
                status="active",
                enabled=False,
                comments="Installed via pynetbox",
            )

        for interface in self.netbox.dcim.interfaces.filter(device_id=device.id):
            interface.speed = self.calculate_interface_speed(interface)
            interface.enabled = False
            interface.save()

        return device

    def delete_device(self, device_name: str) -> None:
        """Delete device by name."""
        self.netbox.dcim.devices.get(name=device_name).delete()

    def attach_interface_to_lag(
        self,
        device_name: str,
        lag_name: str,
        iface_name: str,
        description: str | None = None,
    ) -> Interfaces:
        """Assign a given interface to a :term:`LAG`.

        Returns the interface object after assignment.
        """
        iface = self.get_interface_by_name_and_device(iface_name, device_name)

        # Get :term:`LAG`
        lag = self.get_interface_by_name_and_device(lag_name, device_name)

        # Assign interface to :term:`LAG`, ensuring it does not already belong to a :term:`LAG`.
        if iface.lag:
            msg = f"The interface: {iface_name} on device: {device_name} already belongs to a LAG: {iface.lag.name}."
            raise WorkflowStateError(
                msg,
            )
        iface.lag = lag.id

        # Set description if provided
        if description:
            iface.description = description

        iface.save()
        return iface

    def reserve_interface(self, device_name: str, iface_name: str) -> Interfaces:
        """Reserve an interface by enabling it."""
        # First get interface from device
        interface = self.get_interface_by_name_and_device(iface_name, device_name)

        # Check if interface is reserved
        if interface.enabled:
            msg = f"The interface: {iface_name} on device: {device_name} is already reserved."
            raise WorkflowStateError(msg)

        # Reserve interface by enabling it
        interface.enabled = True
        interface.save()

        return interface

    def allocate_interface(self, device_name: str, iface_name: str) -> Interfaces:
        """Allocate an interface by marking it as connected."""
        # First get interface from device
        interface = self.get_interface_by_name_and_device(iface_name, device_name)

        # Check if interface is reserved
        if interface.mark_connected:
            msg = f"The interface: {iface_name} on device: {device_name} is already allocated."
            raise WorkflowStateError(msg)

        # Allocate interface by marking it as connected
        interface.mark_connected = True
        interface.save()

        return interface

    def free_interface(self, device_name: str, iface_name: str) -> Interfaces:
        """Free interface by marking disconnect and disable it."""
        # First get interface from device
        interface = self.get_interface_by_name_and_device(iface_name, device_name)
        interface.mark_connected = False
        interface.enabled = False
        interface.description = ""
        interface.save()

        return interface

    def detach_interfaces_from_lag(self, device_name: str, lag_name: str) -> None:
        """Detach all interfaces from a :term:`LAG`."""
        device = self.get_device_by_name(device_name)
        lag = self.netbox.dcim.interfaces.get(device_id=device.id, name=lag_name)
        for interface in self.netbox.dcim.interfaces.filter(
            device_id=device.id,
            lag_id=lag.id,
            enabled=False,
            mark_connected=False,
        ):
            interface.lag = None
            interface.save()

    def get_available_lags(self, router_id: UUID) -> list[str]:
        """Return all available :term:`LAG` not assigned to a device."""
        router_name = Router.from_subscription(router_id).router.router_fqdn
        device = self.get_device_by_name(router_name)

        # Get the existing :term:`LAG` interfaces for the device
        lag_interface_names = [
            interface["name"] for interface in self.netbox.dcim.interfaces.filter(device=device.name, type="lag")
        ]

        # Generate all feasible LAGs
        all_feasible_lags = [f"LAG-{i}" for i in FEASIBLE_IP_TRUNK_LAG_RANGE]

        # Return available LAGs not assigned to the device
        return [lag for lag in all_feasible_lags if lag not in lag_interface_names]

    @staticmethod
    def calculate_speed_bits_per_sec(speed: str) -> int:
        """Extract the numeric part from the speed."""
        numeric_part = int("".join(filter(str.isdigit, speed)))
        # Convert to bits per second
        return numeric_part * 1000000

    def get_available_interfaces(self, router_id: UUID | UUIDstr, speed: str) -> Interfaces:
        """Return all available interfaces of a device filtered by speed."""
        router = Router.from_subscription(router_id).router.router_fqdn
        device = self.get_device_by_name(router)
        speed_bps = self.calculate_speed_bits_per_sec(speed)
        return self.netbox.dcim.interfaces.filter(
            device=device.name,
            enabled=False,
            mark_connected=False,
            speed=speed_bps,
        )